View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.DefaultAddressedEnvelope;
25  import io.netty.channel.socket.DatagramChannel;
26  import io.netty.channel.socket.DatagramChannelConfig;
27  import io.netty.channel.socket.DatagramPacket;
28  import io.netty.channel.socket.InternetProtocolFamily;
29  import io.netty.channel.unix.DatagramSocketAddress;
30  import io.netty.channel.unix.Errors;
31  import io.netty.channel.unix.IovArray;
32  import io.netty.channel.unix.UnixChannelUtil;
33  import io.netty.util.UncheckedBooleanSupplier;
34  import io.netty.util.internal.ObjectUtil;
35  import io.netty.util.internal.StringUtil;
36  
37  import java.io.IOException;
38  import java.net.InetAddress;
39  import java.net.InetSocketAddress;
40  import java.net.NetworkInterface;
41  import java.net.PortUnreachableException;
42  import java.net.SocketAddress;
43  import java.net.SocketException;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.UnresolvedAddressException;
46  
47  import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
48  
49  public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
50      private static final String EXPECTED_TYPES =
51              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
52                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
53                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
54                      StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
55                      StringUtil.simpleClassName(ByteBuf.class) + ')';
56  
57      private volatile boolean connected;
58      private final KQueueDatagramChannelConfig config;
59  
60      public KQueueDatagramChannel() {
61          super(null, newSocketDgram(), false);
62          config = new KQueueDatagramChannelConfig(this);
63      }
64  
65      public KQueueDatagramChannel(InternetProtocolFamily protocol) {
66          super(null, newSocketDgram(protocol), false);
67          config = new KQueueDatagramChannelConfig(this);
68      }
69  
70      public KQueueDatagramChannel(int fd) {
71          this(new BsdSocket(fd), true);
72      }
73  
74      KQueueDatagramChannel(BsdSocket socket, boolean active) {
75          super(null, socket, active);
76          config = new KQueueDatagramChannelConfig(this);
77      }
78  
79      @Override
80      public InetSocketAddress remoteAddress() {
81          return (InetSocketAddress) super.remoteAddress();
82      }
83  
84      @Override
85      public InetSocketAddress localAddress() {
86          return (InetSocketAddress) super.localAddress();
87      }
88  
89      @Override
90      @SuppressWarnings("deprecation")
91      public boolean isActive() {
92          return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
93      }
94  
95      @Override
96      public boolean isConnected() {
97          return connected;
98      }
99  
100     @Override
101     public ChannelFuture joinGroup(InetAddress multicastAddress) {
102         return joinGroup(multicastAddress, newPromise());
103     }
104 
105     @Override
106     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
107         try {
108             NetworkInterface iface = config().getNetworkInterface();
109             if (iface == null) {
110                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
111             }
112             return joinGroup(multicastAddress, iface, null, promise);
113         } catch (SocketException e) {
114             promise.setFailure(e);
115         }
116         return promise;
117     }
118 
119     @Override
120     public ChannelFuture joinGroup(
121             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
122         return joinGroup(multicastAddress, networkInterface, newPromise());
123     }
124 
125     @Override
126     public ChannelFuture joinGroup(
127             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
128             ChannelPromise promise) {
129         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
130     }
131 
132     @Override
133     public ChannelFuture joinGroup(
134             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
135         return joinGroup(multicastAddress, networkInterface, source, newPromise());
136     }
137 
138     @Override
139     public ChannelFuture joinGroup(
140             final InetAddress multicastAddress, final NetworkInterface networkInterface,
141             final InetAddress source, final ChannelPromise promise) {
142 
143         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
144         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
145 
146         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
147         return promise;
148     }
149 
150     @Override
151     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
152         return leaveGroup(multicastAddress, newPromise());
153     }
154 
155     @Override
156     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
157         try {
158             return leaveGroup(
159                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
160         } catch (SocketException e) {
161             promise.setFailure(e);
162         }
163         return promise;
164     }
165 
166     @Override
167     public ChannelFuture leaveGroup(
168             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
169         return leaveGroup(multicastAddress, networkInterface, newPromise());
170     }
171 
172     @Override
173     public ChannelFuture leaveGroup(
174             InetSocketAddress multicastAddress,
175             NetworkInterface networkInterface, ChannelPromise promise) {
176         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
177     }
178 
179     @Override
180     public ChannelFuture leaveGroup(
181             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
182         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
183     }
184 
185     @Override
186     public ChannelFuture leaveGroup(
187             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
188             final ChannelPromise promise) {
189         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
190         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
191 
192         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
193 
194         return promise;
195     }
196 
197     @Override
198     public ChannelFuture block(
199             InetAddress multicastAddress, NetworkInterface networkInterface,
200             InetAddress sourceToBlock) {
201         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
202     }
203 
204     @Override
205     public ChannelFuture block(
206             final InetAddress multicastAddress, final NetworkInterface networkInterface,
207             final InetAddress sourceToBlock, final ChannelPromise promise) {
208         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
209         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
210         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
211         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
212         return promise;
213     }
214 
215     @Override
216     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
217         return block(multicastAddress, sourceToBlock, newPromise());
218     }
219 
220     @Override
221     public ChannelFuture block(
222             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
223         try {
224             return block(
225                     multicastAddress,
226                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
227                     sourceToBlock, promise);
228         } catch (Throwable e) {
229             promise.setFailure(e);
230         }
231         return promise;
232     }
233 
234     @Override
235     protected AbstractKQueueUnsafe newUnsafe() {
236         return new KQueueDatagramChannelUnsafe();
237     }
238 
239     @Override
240     protected void doBind(SocketAddress localAddress) throws Exception {
241         super.doBind(localAddress);
242         active = true;
243     }
244 
245     @Override
246     protected boolean doWriteMessage(Object msg) throws Exception {
247         final ByteBuf data;
248         InetSocketAddress remoteAddress;
249         if (msg instanceof AddressedEnvelope) {
250             @SuppressWarnings("unchecked")
251             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
252                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
253             data = envelope.content();
254             remoteAddress = envelope.recipient();
255         } else {
256             data = (ByteBuf) msg;
257             remoteAddress = null;
258         }
259 
260         final int dataLen = data.readableBytes();
261         if (dataLen == 0) {
262             return true;
263         }
264 
265         final long writtenBytes;
266         if (data.hasMemoryAddress()) {
267             long memoryAddress = data.memoryAddress();
268             if (remoteAddress == null) {
269                 try {
270                     writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
271                 } catch (Errors.NativeIoException e) {
272                     throw translateForConnected(e);
273                 }
274             } else {
275                 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
276                         remoteAddress.getAddress(), remoteAddress.getPort());
277             }
278         } else if (data.nioBufferCount() > 1) {
279             IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
280             array.add(data, data.readerIndex(), data.readableBytes());
281             int cnt = array.count();
282             assert cnt != 0;
283 
284             if (remoteAddress == null) {
285                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
286             } else {
287                 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
288                         remoteAddress.getAddress(), remoteAddress.getPort());
289             }
290         } else {
291             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
292             if (remoteAddress == null) {
293                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
294             } else {
295                 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
296                         remoteAddress.getAddress(), remoteAddress.getPort());
297             }
298         }
299 
300         return writtenBytes > 0;
301     }
302 
303     private static IOException translateForConnected(Errors.NativeIoException e) {
304         // We need to correctly translate connect errors to match NIO behaviour.
305         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
306             PortUnreachableException error = new PortUnreachableException(e.getMessage());
307             error.initCause(e);
308             return error;
309         }
310         return e;
311     }
312 
313     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
314         if (envelope.recipient() instanceof InetSocketAddress
315                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
316             throw new UnresolvedAddressException();
317         }
318     }
319 
320     @Override
321     protected Object filterOutboundMessage(Object msg) {
322         if (msg instanceof DatagramPacket) {
323             DatagramPacket packet = (DatagramPacket) msg;
324             checkUnresolved(packet);
325             ByteBuf content = packet.content();
326             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
327                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
328         }
329 
330         if (msg instanceof ByteBuf) {
331             ByteBuf buf = (ByteBuf) msg;
332             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
333         }
334 
335         if (msg instanceof AddressedEnvelope) {
336             @SuppressWarnings("unchecked")
337             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
338             checkUnresolved(e);
339 
340             if (e.content() instanceof ByteBuf &&
341                     (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
342 
343                 ByteBuf content = (ByteBuf) e.content();
344                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
345                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
346                                 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
347             }
348         }
349 
350         throw new UnsupportedOperationException(
351                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
352     }
353 
354     @Override
355     public KQueueDatagramChannelConfig config() {
356         return config;
357     }
358 
359     @Override
360     protected void doDisconnect() throws Exception {
361         socket.disconnect();
362         connected = active = false;
363         resetCachedAddresses();
364     }
365 
366     @Override
367     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
368         if (super.doConnect(remoteAddress, localAddress)) {
369             connected = true;
370             return true;
371         }
372         return false;
373     }
374 
375     @Override
376     protected void doClose() throws Exception {
377         super.doClose();
378         connected = false;
379     }
380 
381     final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
382 
383         @Override
384         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
385             assert eventLoop().inEventLoop();
386             final DatagramChannelConfig config = config();
387             if (shouldBreakReadReady(config)) {
388                 clearReadFilter0();
389                 return;
390             }
391             final ChannelPipeline pipeline = pipeline();
392             final ByteBufAllocator allocator = config.getAllocator();
393             allocHandle.reset(config);
394             readReadyBefore();
395 
396             Throwable exception = null;
397             try {
398                 ByteBuf byteBuf = null;
399                 try {
400                     boolean connected = isConnected();
401                     do {
402                         byteBuf = allocHandle.allocate(allocator);
403                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
404 
405                         final DatagramPacket packet;
406                         if (connected) {
407                             try {
408                                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
409                             } catch (Errors.NativeIoException e) {
410                                 // We need to correctly translate connect errors to match NIO behaviour.
411                                 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
412                                     PortUnreachableException error = new PortUnreachableException(e.getMessage());
413                                     error.initCause(e);
414                                     throw error;
415                                 }
416                                 throw e;
417                             }
418                             if (allocHandle.lastBytesRead() <= 0) {
419                                 // nothing was read, release the buffer.
420                                 byteBuf.release();
421                                 byteBuf = null;
422                                 break;
423                             }
424                             packet = new DatagramPacket(byteBuf,
425                                     (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
426                         } else {
427                             final DatagramSocketAddress remoteAddress;
428                             if (byteBuf.hasMemoryAddress()) {
429                                 // has a memory address so use optimized call
430                                 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
431                                         byteBuf.capacity());
432                             } else {
433                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
434                                         byteBuf.writerIndex(), byteBuf.writableBytes());
435                                 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
436                             }
437 
438                             if (remoteAddress == null) {
439                                 allocHandle.lastBytesRead(-1);
440                                 byteBuf.release();
441                                 byteBuf = null;
442                                 break;
443                             }
444                             InetSocketAddress localAddress = remoteAddress.localAddress();
445                             if (localAddress == null) {
446                                 localAddress = (InetSocketAddress) localAddress();
447                             }
448                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
449                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
450 
451                             packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
452                         }
453 
454                         allocHandle.incMessagesRead(1);
455 
456                         readPending = false;
457                         pipeline.fireChannelRead(packet);
458 
459                         byteBuf = null;
460 
461                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
462                     // as we read anything).
463                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
464                 } catch (Throwable t) {
465                     if (byteBuf != null) {
466                         byteBuf.release();
467                     }
468                     exception = t;
469                 }
470 
471                 allocHandle.readComplete();
472                 pipeline.fireChannelReadComplete();
473 
474                 if (exception != null) {
475                     pipeline.fireExceptionCaught(exception);
476                 }
477             } finally {
478                 readReadyFinally(config);
479             }
480         }
481     }
482 }