View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.DefaultAddressedEnvelope;
25  import io.netty.channel.socket.DatagramChannel;
26  import io.netty.channel.socket.DatagramChannelConfig;
27  import io.netty.channel.socket.DatagramPacket;
28  import io.netty.channel.socket.InternetProtocolFamily;
29  import io.netty.channel.socket.SocketProtocolFamily;
30  import io.netty.channel.unix.DatagramSocketAddress;
31  import io.netty.channel.unix.Errors;
32  import io.netty.channel.unix.IovArray;
33  import io.netty.channel.unix.UnixChannelUtil;
34  import io.netty.util.UncheckedBooleanSupplier;
35  import io.netty.util.internal.ObjectUtil;
36  import io.netty.util.internal.StringUtil;
37  
38  import java.io.IOException;
39  import java.net.InetAddress;
40  import java.net.InetSocketAddress;
41  import java.net.NetworkInterface;
42  import java.net.PortUnreachableException;
43  import java.net.SocketAddress;
44  import java.net.SocketException;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.UnresolvedAddressException;
47  
48  import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
49  
50  public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
51      private static final String EXPECTED_TYPES =
52              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
55                      StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56                      StringUtil.simpleClassName(ByteBuf.class) + ')';
57  
58      private volatile boolean connected;
59      private final KQueueDatagramChannelConfig config;
60  
61      public KQueueDatagramChannel() {
62          super(null, newSocketDgram(), false);
63          config = new KQueueDatagramChannelConfig(this);
64      }
65  
66      /**
67       * @deprecated use {@link KQueueDatagramChannel#KQueueDatagramChannel(SocketProtocolFamily)}
68       */
69      @Deprecated
70      public KQueueDatagramChannel(InternetProtocolFamily protocol) {
71          super(null, newSocketDgram(protocol), false);
72          config = new KQueueDatagramChannelConfig(this);
73      }
74  
75      public KQueueDatagramChannel(SocketProtocolFamily protocol) {
76          super(null, newSocketDgram(protocol), false);
77          config = new KQueueDatagramChannelConfig(this);
78      }
79  
80      public KQueueDatagramChannel(int fd) {
81          this(new BsdSocket(fd), true);
82      }
83  
84      KQueueDatagramChannel(BsdSocket socket, boolean active) {
85          super(null, socket, active);
86          config = new KQueueDatagramChannelConfig(this);
87      }
88  
89      @Override
90      public InetSocketAddress remoteAddress() {
91          return (InetSocketAddress) super.remoteAddress();
92      }
93  
94      @Override
95      public InetSocketAddress localAddress() {
96          return (InetSocketAddress) super.localAddress();
97      }
98  
99      @Override
100     @SuppressWarnings("deprecation")
101     public boolean isActive() {
102         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
103     }
104 
105     @Override
106     public boolean isConnected() {
107         return connected;
108     }
109 
110     @Override
111     public ChannelFuture joinGroup(InetAddress multicastAddress) {
112         return joinGroup(multicastAddress, newPromise());
113     }
114 
115     @Override
116     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
117         try {
118             NetworkInterface iface = config().getNetworkInterface();
119             if (iface == null) {
120                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
121             }
122             return joinGroup(multicastAddress, iface, null, promise);
123         } catch (SocketException e) {
124             promise.setFailure(e);
125         }
126         return promise;
127     }
128 
129     @Override
130     public ChannelFuture joinGroup(
131             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
132         return joinGroup(multicastAddress, networkInterface, newPromise());
133     }
134 
135     @Override
136     public ChannelFuture joinGroup(
137             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
138             ChannelPromise promise) {
139         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
140     }
141 
142     @Override
143     public ChannelFuture joinGroup(
144             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
145         return joinGroup(multicastAddress, networkInterface, source, newPromise());
146     }
147 
148     @Override
149     public ChannelFuture joinGroup(
150             final InetAddress multicastAddress, final NetworkInterface networkInterface,
151             final InetAddress source, final ChannelPromise promise) {
152 
153         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
154         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
155 
156         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
157         return promise;
158     }
159 
160     @Override
161     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
162         return leaveGroup(multicastAddress, newPromise());
163     }
164 
165     @Override
166     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
167         try {
168             return leaveGroup(
169                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
170         } catch (SocketException e) {
171             promise.setFailure(e);
172         }
173         return promise;
174     }
175 
176     @Override
177     public ChannelFuture leaveGroup(
178             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
179         return leaveGroup(multicastAddress, networkInterface, newPromise());
180     }
181 
182     @Override
183     public ChannelFuture leaveGroup(
184             InetSocketAddress multicastAddress,
185             NetworkInterface networkInterface, ChannelPromise promise) {
186         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
187     }
188 
189     @Override
190     public ChannelFuture leaveGroup(
191             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
192         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
193     }
194 
195     @Override
196     public ChannelFuture leaveGroup(
197             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
198             final ChannelPromise promise) {
199         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
200         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
201 
202         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
203 
204         return promise;
205     }
206 
207     @Override
208     public ChannelFuture block(
209             InetAddress multicastAddress, NetworkInterface networkInterface,
210             InetAddress sourceToBlock) {
211         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
212     }
213 
214     @Override
215     public ChannelFuture block(
216             final InetAddress multicastAddress, final NetworkInterface networkInterface,
217             final InetAddress sourceToBlock, final ChannelPromise promise) {
218         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
219         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
220         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
221         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
222         return promise;
223     }
224 
225     @Override
226     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
227         return block(multicastAddress, sourceToBlock, newPromise());
228     }
229 
230     @Override
231     public ChannelFuture block(
232             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
233         try {
234             return block(
235                     multicastAddress,
236                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
237                     sourceToBlock, promise);
238         } catch (Throwable e) {
239             promise.setFailure(e);
240         }
241         return promise;
242     }
243 
244     @Override
245     protected AbstractKQueueUnsafe newUnsafe() {
246         return new KQueueDatagramChannelUnsafe();
247     }
248 
249     @Override
250     protected void doBind(SocketAddress localAddress) throws Exception {
251         super.doBind(localAddress);
252         active = true;
253     }
254 
255     @Override
256     protected boolean doWriteMessage(Object msg) throws Exception {
257         final ByteBuf data;
258         InetSocketAddress remoteAddress;
259         if (msg instanceof AddressedEnvelope) {
260             @SuppressWarnings("unchecked")
261             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
262                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
263             data = envelope.content();
264             remoteAddress = envelope.recipient();
265         } else {
266             data = (ByteBuf) msg;
267             remoteAddress = null;
268         }
269 
270         final int dataLen = data.readableBytes();
271         if (dataLen == 0) {
272             return true;
273         }
274 
275         final long writtenBytes;
276         if (data.hasMemoryAddress()) {
277             long memoryAddress = data.memoryAddress();
278             if (remoteAddress == null) {
279                 try {
280                     writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
281                 } catch (Errors.NativeIoException e) {
282                     throw translateForConnected(e);
283                 }
284             } else {
285                 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
286                         remoteAddress.getAddress(), remoteAddress.getPort());
287             }
288         } else if (data.nioBufferCount() > 1) {
289             IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
290             array.add(data, data.readerIndex(), data.readableBytes());
291             int cnt = array.count();
292             assert cnt != 0;
293 
294             if (remoteAddress == null) {
295                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
296             } else {
297                 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
298                         remoteAddress.getAddress(), remoteAddress.getPort());
299             }
300         } else {
301             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
302             if (remoteAddress == null) {
303                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
304             } else {
305                 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
306                         remoteAddress.getAddress(), remoteAddress.getPort());
307             }
308         }
309 
310         return writtenBytes > 0;
311     }
312 
313     private static IOException translateForConnected(Errors.NativeIoException e) {
314         // We need to correctly translate connect errors to match NIO behaviour.
315         if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
316             PortUnreachableException error = new PortUnreachableException(e.getMessage());
317             error.initCause(e);
318             return error;
319         }
320         return e;
321     }
322 
323     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
324         if (envelope.recipient() instanceof InetSocketAddress
325                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
326             throw new UnresolvedAddressException();
327         }
328     }
329 
330     @Override
331     protected Object filterOutboundMessage(Object msg) {
332         if (msg instanceof DatagramPacket) {
333             DatagramPacket packet = (DatagramPacket) msg;
334             checkUnresolved(packet);
335             ByteBuf content = packet.content();
336             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
337                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
338         }
339 
340         if (msg instanceof ByteBuf) {
341             ByteBuf buf = (ByteBuf) msg;
342             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
343         }
344 
345         if (msg instanceof AddressedEnvelope) {
346             @SuppressWarnings("unchecked")
347             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
348             checkUnresolved(e);
349 
350             if (e.content() instanceof ByteBuf &&
351                     (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
352 
353                 ByteBuf content = (ByteBuf) e.content();
354                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
355                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
356                                 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
357             }
358         }
359 
360         throw new UnsupportedOperationException(
361                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
362     }
363 
364     @Override
365     public KQueueDatagramChannelConfig config() {
366         return config;
367     }
368 
369     @Override
370     protected void doDisconnect() throws Exception {
371         socket.disconnect();
372         connected = active = false;
373         resetCachedAddresses();
374     }
375 
376     @Override
377     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
378         if (super.doConnect(remoteAddress, localAddress)) {
379             connected = true;
380             return true;
381         }
382         return false;
383     }
384 
385     @Override
386     protected void doClose() throws Exception {
387         super.doClose();
388         connected = false;
389     }
390 
391     final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
392 
393         @Override
394         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
395             assert eventLoop().inEventLoop();
396             final DatagramChannelConfig config = config();
397             if (shouldBreakReadReady(config)) {
398                 clearReadFilter0();
399                 return;
400             }
401             final ChannelPipeline pipeline = pipeline();
402             final ByteBufAllocator allocator = config.getAllocator();
403             allocHandle.reset(config);
404 
405             Throwable exception = null;
406             try {
407                 ByteBuf byteBuf = null;
408                 try {
409                     boolean connected = isConnected();
410                     do {
411                         byteBuf = allocHandle.allocate(allocator);
412                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
413 
414                         final DatagramPacket packet;
415                         if (connected) {
416                             try {
417                                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
418                             } catch (Errors.NativeIoException e) {
419                                 // We need to correctly translate connect errors to match NIO behaviour.
420                                 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
421                                     PortUnreachableException error = new PortUnreachableException(e.getMessage());
422                                     error.initCause(e);
423                                     throw error;
424                                 }
425                                 throw e;
426                             }
427                             if (allocHandle.lastBytesRead() <= 0) {
428                                 // nothing was read, release the buffer.
429                                 byteBuf.release();
430                                 byteBuf = null;
431                                 break;
432                             }
433                             packet = new DatagramPacket(byteBuf,
434                                     (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
435                         } else {
436                             final DatagramSocketAddress remoteAddress;
437                             if (byteBuf.hasMemoryAddress()) {
438                                 // has a memory address so use optimized call
439                                 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
440                                         byteBuf.capacity());
441                             } else {
442                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
443                                         byteBuf.writerIndex(), byteBuf.writableBytes());
444                                 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
445                             }
446 
447                             if (remoteAddress == null) {
448                                 allocHandle.lastBytesRead(-1);
449                                 byteBuf.release();
450                                 byteBuf = null;
451                                 break;
452                             }
453                             InetSocketAddress localAddress = remoteAddress.localAddress();
454                             if (localAddress == null) {
455                                 localAddress = (InetSocketAddress) localAddress();
456                             }
457                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
458                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
459 
460                             packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
461                         }
462 
463                         allocHandle.incMessagesRead(1);
464 
465                         readPending = false;
466                         pipeline.fireChannelRead(packet);
467 
468                         byteBuf = null;
469 
470                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
471                     // as we read anything).
472                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
473                 } catch (Throwable t) {
474                     if (byteBuf != null) {
475                         byteBuf.release();
476                     }
477                     exception = t;
478                 }
479 
480                 allocHandle.readComplete();
481                 pipeline.fireChannelReadComplete();
482 
483                 if (exception != null) {
484                     pipeline.fireExceptionCaught(exception);
485                 }
486             } finally {
487                 if (shouldStopReading(config)) {
488                     clearReadFilter0();
489                 }
490             }
491         }
492     }
493 }