View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.DefaultAddressedEnvelope;
25  import io.netty.channel.socket.DatagramChannel;
26  import io.netty.channel.socket.DatagramChannelConfig;
27  import io.netty.channel.socket.DatagramPacket;
28  import io.netty.channel.socket.InternetProtocolFamily;
29  import io.netty.channel.socket.SocketProtocolFamily;
30  import io.netty.channel.unix.DatagramSocketAddress;
31  import io.netty.channel.unix.Errors;
32  import io.netty.channel.unix.IovArray;
33  import io.netty.channel.unix.UnixChannelUtil;
34  import io.netty.util.UncheckedBooleanSupplier;
35  import io.netty.util.internal.ObjectUtil;
36  import io.netty.util.internal.StringUtil;
37  
38  import java.net.InetAddress;
39  import java.net.InetSocketAddress;
40  import java.net.NetworkInterface;
41  import java.net.PortUnreachableException;
42  import java.net.SocketAddress;
43  import java.net.SocketException;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.UnresolvedAddressException;
46  
47  import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
48  
49  public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
50      private static final String EXPECTED_TYPES =
51              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
52                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
53                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
54                      StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
55                      StringUtil.simpleClassName(ByteBuf.class) + ')';
56  
57      private volatile boolean connected;
58      private final KQueueDatagramChannelConfig config;
59  
60      public KQueueDatagramChannel() {
61          super(null, newSocketDgram(), false);
62          config = new KQueueDatagramChannelConfig(this);
63      }
64  
65      /**
66       * @deprecated use {@link KQueueDatagramChannel#KQueueDatagramChannel(SocketProtocolFamily)}
67       */
68      @Deprecated
69      public KQueueDatagramChannel(InternetProtocolFamily protocol) {
70          super(null, newSocketDgram(protocol), false);
71          config = new KQueueDatagramChannelConfig(this);
72      }
73  
74      public KQueueDatagramChannel(SocketProtocolFamily protocol) {
75          super(null, newSocketDgram(protocol), false);
76          config = new KQueueDatagramChannelConfig(this);
77      }
78  
79      public KQueueDatagramChannel(int fd) {
80          this(new BsdSocket(fd), true);
81      }
82  
83      KQueueDatagramChannel(BsdSocket socket, boolean active) {
84          super(null, socket, active);
85          config = new KQueueDatagramChannelConfig(this);
86      }
87  
88      @Override
89      public InetSocketAddress remoteAddress() {
90          return (InetSocketAddress) super.remoteAddress();
91      }
92  
93      @Override
94      public InetSocketAddress localAddress() {
95          return (InetSocketAddress) super.localAddress();
96      }
97  
98      @Override
99      @SuppressWarnings("deprecation")
100     public boolean isActive() {
101         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
102     }
103 
104     @Override
105     public boolean isConnected() {
106         return connected;
107     }
108 
109     @Override
110     public ChannelFuture joinGroup(InetAddress multicastAddress) {
111         return joinGroup(multicastAddress, newPromise());
112     }
113 
114     @Override
115     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
116         try {
117             NetworkInterface iface = config().getNetworkInterface();
118             if (iface == null) {
119                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
120             }
121             return joinGroup(multicastAddress, iface, null, promise);
122         } catch (SocketException e) {
123             promise.setFailure(e);
124         }
125         return promise;
126     }
127 
128     @Override
129     public ChannelFuture joinGroup(
130             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
131         return joinGroup(multicastAddress, networkInterface, newPromise());
132     }
133 
134     @Override
135     public ChannelFuture joinGroup(
136             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
137             ChannelPromise promise) {
138         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
139     }
140 
141     @Override
142     public ChannelFuture joinGroup(
143             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
144         return joinGroup(multicastAddress, networkInterface, source, newPromise());
145     }
146 
147     @Override
148     public ChannelFuture joinGroup(
149             final InetAddress multicastAddress, final NetworkInterface networkInterface,
150             final InetAddress source, final ChannelPromise promise) {
151 
152         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
153         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
154 
155         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
156         return promise;
157     }
158 
159     @Override
160     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
161         return leaveGroup(multicastAddress, newPromise());
162     }
163 
164     @Override
165     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
166         try {
167             return leaveGroup(
168                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
169         } catch (SocketException e) {
170             promise.setFailure(e);
171         }
172         return promise;
173     }
174 
175     @Override
176     public ChannelFuture leaveGroup(
177             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
178         return leaveGroup(multicastAddress, networkInterface, newPromise());
179     }
180 
181     @Override
182     public ChannelFuture leaveGroup(
183             InetSocketAddress multicastAddress,
184             NetworkInterface networkInterface, ChannelPromise promise) {
185         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
186     }
187 
188     @Override
189     public ChannelFuture leaveGroup(
190             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
191         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
192     }
193 
194     @Override
195     public ChannelFuture leaveGroup(
196             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
197             final ChannelPromise promise) {
198         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
199         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
200 
201         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
202 
203         return promise;
204     }
205 
206     @Override
207     public ChannelFuture block(
208             InetAddress multicastAddress, NetworkInterface networkInterface,
209             InetAddress sourceToBlock) {
210         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
211     }
212 
213     @Override
214     public ChannelFuture block(
215             final InetAddress multicastAddress, final NetworkInterface networkInterface,
216             final InetAddress sourceToBlock, final ChannelPromise promise) {
217         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
218         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
219         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
220         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
221         return promise;
222     }
223 
224     @Override
225     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
226         return block(multicastAddress, sourceToBlock, newPromise());
227     }
228 
229     @Override
230     public ChannelFuture block(
231             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
232         try {
233             return block(
234                     multicastAddress,
235                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
236                     sourceToBlock, promise);
237         } catch (Throwable e) {
238             promise.setFailure(e);
239         }
240         return promise;
241     }
242 
243     @Override
244     protected AbstractKQueueUnsafe newUnsafe() {
245         return new KQueueDatagramChannelUnsafe();
246     }
247 
248     @Override
249     protected void doBind(SocketAddress localAddress) throws Exception {
250         super.doBind(localAddress);
251         active = true;
252     }
253 
254     @Override
255     protected boolean doWriteMessage(Object msg) throws Exception {
256         final ByteBuf data;
257         InetSocketAddress remoteAddress;
258         if (msg instanceof AddressedEnvelope) {
259             @SuppressWarnings("unchecked")
260             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
261                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
262             data = envelope.content();
263             remoteAddress = envelope.recipient();
264         } else {
265             data = (ByteBuf) msg;
266             remoteAddress = null;
267         }
268 
269         final int dataLen = data.readableBytes();
270         if (dataLen == 0) {
271             return true;
272         }
273 
274         final long writtenBytes;
275         if (data.hasMemoryAddress()) {
276             long memoryAddress = data.memoryAddress();
277             if (remoteAddress == null) {
278                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
279             } else {
280                 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
281                         remoteAddress.getAddress(), remoteAddress.getPort());
282             }
283         } else if (data.nioBufferCount() > 1) {
284             IovArray array = registration().ioHandler().cleanArray();
285             array.add(data, data.readerIndex(), data.readableBytes());
286             int cnt = array.count();
287             assert cnt != 0;
288 
289             if (remoteAddress == null) {
290                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
291             } else {
292                 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
293                         remoteAddress.getAddress(), remoteAddress.getPort());
294             }
295         } else {
296             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
297             if (remoteAddress == null) {
298                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
299             } else {
300                 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
301                         remoteAddress.getAddress(), remoteAddress.getPort());
302             }
303         }
304 
305         return writtenBytes > 0;
306     }
307 
308     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
309         if (envelope.recipient() instanceof InetSocketAddress
310                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
311             throw new UnresolvedAddressException();
312         }
313     }
314 
315     @Override
316     protected Object filterOutboundMessage(Object msg) {
317         if (msg instanceof DatagramPacket) {
318             DatagramPacket packet = (DatagramPacket) msg;
319             checkUnresolved(packet);
320             ByteBuf content = packet.content();
321             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
322                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
323         }
324 
325         if (msg instanceof ByteBuf) {
326             ByteBuf buf = (ByteBuf) msg;
327             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
328         }
329 
330         if (msg instanceof AddressedEnvelope) {
331             @SuppressWarnings("unchecked")
332             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
333             checkUnresolved(e);
334 
335             if (e.content() instanceof ByteBuf &&
336                     (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
337 
338                 ByteBuf content = (ByteBuf) e.content();
339                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
340                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
341                                 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
342             }
343         }
344 
345         throw new UnsupportedOperationException(
346                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
347     }
348 
349     @Override
350     public KQueueDatagramChannelConfig config() {
351         return config;
352     }
353 
354     @Override
355     protected void doDisconnect() throws Exception {
356         socket.disconnect();
357         connected = active = false;
358         resetCachedAddresses();
359     }
360 
361     @Override
362     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
363         if (super.doConnect(remoteAddress, localAddress)) {
364             connected = true;
365             return true;
366         }
367         return false;
368     }
369 
370     @Override
371     protected void doClose() throws Exception {
372         super.doClose();
373         connected = false;
374     }
375 
376     final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
377 
378         @Override
379         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
380             assert eventLoop().inEventLoop();
381             final DatagramChannelConfig config = config();
382             if (shouldBreakReadReady(config)) {
383                 clearReadFilter0();
384                 return;
385             }
386             final ChannelPipeline pipeline = pipeline();
387             final ByteBufAllocator allocator = config.getAllocator();
388             allocHandle.reset(config);
389             readReadyBefore();
390 
391             Throwable exception = null;
392             try {
393                 ByteBuf byteBuf = null;
394                 try {
395                     boolean connected = isConnected();
396                     do {
397                         byteBuf = allocHandle.allocate(allocator);
398                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
399 
400                         final DatagramPacket packet;
401                         if (connected) {
402                             try {
403                                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
404                             } catch (Errors.NativeIoException e) {
405                                 // We need to correctly translate connect errors to match NIO behaviour.
406                                 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
407                                     PortUnreachableException error = new PortUnreachableException(e.getMessage());
408                                     error.initCause(e);
409                                     throw error;
410                                 }
411                                 throw e;
412                             }
413                             if (allocHandle.lastBytesRead() <= 0) {
414                                 // nothing was read, release the buffer.
415                                 byteBuf.release();
416                                 byteBuf = null;
417                                 break;
418                             }
419                             packet = new DatagramPacket(byteBuf,
420                                     (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
421                         } else {
422                             final DatagramSocketAddress remoteAddress;
423                             if (byteBuf.hasMemoryAddress()) {
424                                 // has a memory address so use optimized call
425                                 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
426                                         byteBuf.capacity());
427                             } else {
428                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
429                                         byteBuf.writerIndex(), byteBuf.writableBytes());
430                                 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
431                             }
432 
433                             if (remoteAddress == null) {
434                                 allocHandle.lastBytesRead(-1);
435                                 byteBuf.release();
436                                 byteBuf = null;
437                                 break;
438                             }
439                             InetSocketAddress localAddress = remoteAddress.localAddress();
440                             if (localAddress == null) {
441                                 localAddress = (InetSocketAddress) localAddress();
442                             }
443                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
444                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
445 
446                             packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
447                         }
448 
449                         allocHandle.incMessagesRead(1);
450 
451                         readPending = false;
452                         pipeline.fireChannelRead(packet);
453 
454                         byteBuf = null;
455 
456                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
457                     // as we read anything).
458                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
459                 } catch (Throwable t) {
460                     if (byteBuf != null) {
461                         byteBuf.release();
462                     }
463                     exception = t;
464                 }
465 
466                 allocHandle.readComplete();
467                 pipeline.fireChannelReadComplete();
468 
469                 if (exception != null) {
470                     pipeline.fireExceptionCaught(exception);
471                 }
472             } finally {
473                 readReadyFinally(config);
474             }
475         }
476     }
477 }