View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.DefaultAddressedEnvelope;
25  import io.netty.channel.socket.DatagramChannel;
26  import io.netty.channel.socket.DatagramChannelConfig;
27  import io.netty.channel.socket.DatagramPacket;
28  import io.netty.channel.socket.InternetProtocolFamily;
29  import io.netty.channel.unix.DatagramSocketAddress;
30  import io.netty.channel.unix.Errors;
31  import io.netty.channel.unix.IovArray;
32  import io.netty.channel.unix.UnixChannelUtil;
33  import io.netty.util.UncheckedBooleanSupplier;
34  import io.netty.util.internal.ObjectUtil;
35  import io.netty.util.internal.StringUtil;
36  
37  import java.net.InetAddress;
38  import java.net.InetSocketAddress;
39  import java.net.NetworkInterface;
40  import java.net.PortUnreachableException;
41  import java.net.SocketAddress;
42  import java.net.SocketException;
43  import java.nio.ByteBuffer;
44  import java.nio.channels.UnresolvedAddressException;
45  
46  import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
47  
48  public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
49      private static final String EXPECTED_TYPES =
50              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
51                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
52                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
53                      StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
54                      StringUtil.simpleClassName(ByteBuf.class) + ')';
55  
56      private volatile boolean connected;
57      private final KQueueDatagramChannelConfig config;
58  
59      public KQueueDatagramChannel() {
60          super(null, newSocketDgram(), false);
61          config = new KQueueDatagramChannelConfig(this);
62      }
63  
64      public KQueueDatagramChannel(InternetProtocolFamily protocol) {
65          super(null, newSocketDgram(protocol), false);
66          config = new KQueueDatagramChannelConfig(this);
67      }
68  
69      public KQueueDatagramChannel(int fd) {
70          this(new BsdSocket(fd), true);
71      }
72  
73      KQueueDatagramChannel(BsdSocket socket, boolean active) {
74          super(null, socket, active);
75          config = new KQueueDatagramChannelConfig(this);
76      }
77  
78      @Override
79      public InetSocketAddress remoteAddress() {
80          return (InetSocketAddress) super.remoteAddress();
81      }
82  
83      @Override
84      public InetSocketAddress localAddress() {
85          return (InetSocketAddress) super.localAddress();
86      }
87  
88      @Override
89      @SuppressWarnings("deprecation")
90      public boolean isActive() {
91          return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
92      }
93  
94      @Override
95      public boolean isConnected() {
96          return connected;
97      }
98  
99      @Override
100     public ChannelFuture joinGroup(InetAddress multicastAddress) {
101         return joinGroup(multicastAddress, newPromise());
102     }
103 
104     @Override
105     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
106         try {
107             NetworkInterface iface = config().getNetworkInterface();
108             if (iface == null) {
109                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
110             }
111             return joinGroup(multicastAddress, iface, null, promise);
112         } catch (SocketException e) {
113             promise.setFailure(e);
114         }
115         return promise;
116     }
117 
118     @Override
119     public ChannelFuture joinGroup(
120             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
121         return joinGroup(multicastAddress, networkInterface, newPromise());
122     }
123 
124     @Override
125     public ChannelFuture joinGroup(
126             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
127             ChannelPromise promise) {
128         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
129     }
130 
131     @Override
132     public ChannelFuture joinGroup(
133             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
134         return joinGroup(multicastAddress, networkInterface, source, newPromise());
135     }
136 
137     @Override
138     public ChannelFuture joinGroup(
139             final InetAddress multicastAddress, final NetworkInterface networkInterface,
140             final InetAddress source, final ChannelPromise promise) {
141 
142         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
143         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
144 
145         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
146         return promise;
147     }
148 
149     @Override
150     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
151         return leaveGroup(multicastAddress, newPromise());
152     }
153 
154     @Override
155     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
156         try {
157             return leaveGroup(
158                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
159         } catch (SocketException e) {
160             promise.setFailure(e);
161         }
162         return promise;
163     }
164 
165     @Override
166     public ChannelFuture leaveGroup(
167             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
168         return leaveGroup(multicastAddress, networkInterface, newPromise());
169     }
170 
171     @Override
172     public ChannelFuture leaveGroup(
173             InetSocketAddress multicastAddress,
174             NetworkInterface networkInterface, ChannelPromise promise) {
175         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
176     }
177 
178     @Override
179     public ChannelFuture leaveGroup(
180             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
181         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
182     }
183 
184     @Override
185     public ChannelFuture leaveGroup(
186             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
187             final ChannelPromise promise) {
188         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
189         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
190 
191         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
192 
193         return promise;
194     }
195 
196     @Override
197     public ChannelFuture block(
198             InetAddress multicastAddress, NetworkInterface networkInterface,
199             InetAddress sourceToBlock) {
200         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
201     }
202 
203     @Override
204     public ChannelFuture block(
205             final InetAddress multicastAddress, final NetworkInterface networkInterface,
206             final InetAddress sourceToBlock, final ChannelPromise promise) {
207         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
208         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
209         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
210         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
211         return promise;
212     }
213 
214     @Override
215     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
216         return block(multicastAddress, sourceToBlock, newPromise());
217     }
218 
219     @Override
220     public ChannelFuture block(
221             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
222         try {
223             return block(
224                     multicastAddress,
225                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
226                     sourceToBlock, promise);
227         } catch (Throwable e) {
228             promise.setFailure(e);
229         }
230         return promise;
231     }
232 
233     @Override
234     protected AbstractKQueueUnsafe newUnsafe() {
235         return new KQueueDatagramChannelUnsafe();
236     }
237 
238     @Override
239     protected void doBind(SocketAddress localAddress) throws Exception {
240         super.doBind(localAddress);
241         active = true;
242     }
243 
244     @Override
245     protected boolean doWriteMessage(Object msg) throws Exception {
246         final ByteBuf data;
247         InetSocketAddress remoteAddress;
248         if (msg instanceof AddressedEnvelope) {
249             @SuppressWarnings("unchecked")
250             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
251                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
252             data = envelope.content();
253             remoteAddress = envelope.recipient();
254         } else {
255             data = (ByteBuf) msg;
256             remoteAddress = null;
257         }
258 
259         final int dataLen = data.readableBytes();
260         if (dataLen == 0) {
261             return true;
262         }
263 
264         final long writtenBytes;
265         if (data.hasMemoryAddress()) {
266             long memoryAddress = data.memoryAddress();
267             if (remoteAddress == null) {
268                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
269             } else {
270                 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
271                         remoteAddress.getAddress(), remoteAddress.getPort());
272             }
273         } else if (data.nioBufferCount() > 1) {
274             IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
275             array.add(data, data.readerIndex(), data.readableBytes());
276             int cnt = array.count();
277             assert cnt != 0;
278 
279             if (remoteAddress == null) {
280                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
281             } else {
282                 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
283                         remoteAddress.getAddress(), remoteAddress.getPort());
284             }
285         } else {
286             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
287             if (remoteAddress == null) {
288                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
289             } else {
290                 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
291                         remoteAddress.getAddress(), remoteAddress.getPort());
292             }
293         }
294 
295         return writtenBytes > 0;
296     }
297 
298     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
299         if (envelope.recipient() instanceof InetSocketAddress
300                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
301             throw new UnresolvedAddressException();
302         }
303     }
304 
305     @Override
306     protected Object filterOutboundMessage(Object msg) {
307         if (msg instanceof DatagramPacket) {
308             DatagramPacket packet = (DatagramPacket) msg;
309             checkUnresolved(packet);
310             ByteBuf content = packet.content();
311             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
312                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
313         }
314 
315         if (msg instanceof ByteBuf) {
316             ByteBuf buf = (ByteBuf) msg;
317             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
318         }
319 
320         if (msg instanceof AddressedEnvelope) {
321             @SuppressWarnings("unchecked")
322             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
323             checkUnresolved(e);
324 
325             if (e.content() instanceof ByteBuf &&
326                     (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
327 
328                 ByteBuf content = (ByteBuf) e.content();
329                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
330                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
331                                 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
332             }
333         }
334 
335         throw new UnsupportedOperationException(
336                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
337     }
338 
339     @Override
340     public KQueueDatagramChannelConfig config() {
341         return config;
342     }
343 
344     @Override
345     protected void doDisconnect() throws Exception {
346         socket.disconnect();
347         connected = active = false;
348         resetCachedAddresses();
349     }
350 
351     @Override
352     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
353         if (super.doConnect(remoteAddress, localAddress)) {
354             connected = true;
355             return true;
356         }
357         return false;
358     }
359 
360     @Override
361     protected void doClose() throws Exception {
362         super.doClose();
363         connected = false;
364     }
365 
366     final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
367 
368         @Override
369         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
370             assert eventLoop().inEventLoop();
371             final DatagramChannelConfig config = config();
372             if (shouldBreakReadReady(config)) {
373                 clearReadFilter0();
374                 return;
375             }
376             final ChannelPipeline pipeline = pipeline();
377             final ByteBufAllocator allocator = config.getAllocator();
378             allocHandle.reset(config);
379             readReadyBefore();
380 
381             Throwable exception = null;
382             try {
383                 ByteBuf byteBuf = null;
384                 try {
385                     boolean connected = isConnected();
386                     do {
387                         byteBuf = allocHandle.allocate(allocator);
388                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
389 
390                         final DatagramPacket packet;
391                         if (connected) {
392                             try {
393                                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
394                             } catch (Errors.NativeIoException e) {
395                                 // We need to correctly translate connect errors to match NIO behaviour.
396                                 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
397                                     PortUnreachableException error = new PortUnreachableException(e.getMessage());
398                                     error.initCause(e);
399                                     throw error;
400                                 }
401                                 throw e;
402                             }
403                             if (allocHandle.lastBytesRead() <= 0) {
404                                 // nothing was read, release the buffer.
405                                 byteBuf.release();
406                                 byteBuf = null;
407                                 break;
408                             }
409                             packet = new DatagramPacket(byteBuf,
410                                     (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
411                         } else {
412                             final DatagramSocketAddress remoteAddress;
413                             if (byteBuf.hasMemoryAddress()) {
414                                 // has a memory address so use optimized call
415                                 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
416                                         byteBuf.capacity());
417                             } else {
418                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
419                                         byteBuf.writerIndex(), byteBuf.writableBytes());
420                                 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
421                             }
422 
423                             if (remoteAddress == null) {
424                                 allocHandle.lastBytesRead(-1);
425                                 byteBuf.release();
426                                 byteBuf = null;
427                                 break;
428                             }
429                             InetSocketAddress localAddress = remoteAddress.localAddress();
430                             if (localAddress == null) {
431                                 localAddress = (InetSocketAddress) localAddress();
432                             }
433                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
434                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
435 
436                             packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
437                         }
438 
439                         allocHandle.incMessagesRead(1);
440 
441                         readPending = false;
442                         pipeline.fireChannelRead(packet);
443 
444                         byteBuf = null;
445 
446                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
447                     // as we read anything).
448                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
449                 } catch (Throwable t) {
450                     if (byteBuf != null) {
451                         byteBuf.release();
452                     }
453                     exception = t;
454                 }
455 
456                 allocHandle.readComplete();
457                 pipeline.fireChannelReadComplete();
458 
459                 if (exception != null) {
460                     pipeline.fireExceptionCaught(exception);
461                 }
462             } finally {
463                 readReadyFinally(config);
464             }
465         }
466     }
467 }