View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.DefaultAddressedEnvelope;
25  import io.netty.channel.socket.DatagramChannel;
26  import io.netty.channel.socket.DatagramChannelConfig;
27  import io.netty.channel.socket.DatagramPacket;
28  import io.netty.channel.socket.InternetProtocolFamily;
29  import io.netty.channel.unix.DatagramSocketAddress;
30  import io.netty.channel.unix.Errors;
31  import io.netty.channel.unix.IovArray;
32  import io.netty.channel.unix.UnixChannelUtil;
33  import io.netty.util.UncheckedBooleanSupplier;
34  import io.netty.util.internal.ObjectUtil;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.UnstableApi;
37  
38  import java.net.InetAddress;
39  import java.net.InetSocketAddress;
40  import java.net.NetworkInterface;
41  import java.net.PortUnreachableException;
42  import java.net.SocketAddress;
43  import java.net.SocketException;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.UnresolvedAddressException;
46  
47  import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
48  
49  @UnstableApi
50  public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
51      private static final String EXPECTED_TYPES =
52              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
55                      StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56                      StringUtil.simpleClassName(ByteBuf.class) + ')';
57  
58      private volatile boolean connected;
59      private final KQueueDatagramChannelConfig config;
60  
61      public KQueueDatagramChannel() {
62          super(null, newSocketDgram(), false);
63          config = new KQueueDatagramChannelConfig(this);
64      }
65  
66      public KQueueDatagramChannel(InternetProtocolFamily protocol) {
67          super(null, newSocketDgram(protocol), false);
68          config = new KQueueDatagramChannelConfig(this);
69      }
70  
71      public KQueueDatagramChannel(int fd) {
72          this(new BsdSocket(fd), true);
73      }
74  
75      KQueueDatagramChannel(BsdSocket socket, boolean active) {
76          super(null, socket, active);
77          config = new KQueueDatagramChannelConfig(this);
78      }
79  
80      @Override
81      public InetSocketAddress remoteAddress() {
82          return (InetSocketAddress) super.remoteAddress();
83      }
84  
85      @Override
86      public InetSocketAddress localAddress() {
87          return (InetSocketAddress) super.localAddress();
88      }
89  
90      @Override
91      @SuppressWarnings("deprecation")
92      public boolean isActive() {
93          return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
94      }
95  
96      @Override
97      public boolean isConnected() {
98          return connected;
99      }
100 
101     @Override
102     public ChannelFuture joinGroup(InetAddress multicastAddress) {
103         return joinGroup(multicastAddress, newPromise());
104     }
105 
106     @Override
107     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
108         try {
109             NetworkInterface iface = config().getNetworkInterface();
110             if (iface == null) {
111                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
112             }
113             return joinGroup(multicastAddress, iface, null, promise);
114         } catch (SocketException e) {
115             promise.setFailure(e);
116         }
117         return promise;
118     }
119 
120     @Override
121     public ChannelFuture joinGroup(
122             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
123         return joinGroup(multicastAddress, networkInterface, newPromise());
124     }
125 
126     @Override
127     public ChannelFuture joinGroup(
128             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
129             ChannelPromise promise) {
130         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
131     }
132 
133     @Override
134     public ChannelFuture joinGroup(
135             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
136         return joinGroup(multicastAddress, networkInterface, source, newPromise());
137     }
138 
139     @Override
140     public ChannelFuture joinGroup(
141             final InetAddress multicastAddress, final NetworkInterface networkInterface,
142             final InetAddress source, final ChannelPromise promise) {
143 
144         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
145         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
146 
147         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
148         return promise;
149     }
150 
151     @Override
152     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
153         return leaveGroup(multicastAddress, newPromise());
154     }
155 
156     @Override
157     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
158         try {
159             return leaveGroup(
160                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
161         } catch (SocketException e) {
162             promise.setFailure(e);
163         }
164         return promise;
165     }
166 
167     @Override
168     public ChannelFuture leaveGroup(
169             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
170         return leaveGroup(multicastAddress, networkInterface, newPromise());
171     }
172 
173     @Override
174     public ChannelFuture leaveGroup(
175             InetSocketAddress multicastAddress,
176             NetworkInterface networkInterface, ChannelPromise promise) {
177         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
178     }
179 
180     @Override
181     public ChannelFuture leaveGroup(
182             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
183         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
184     }
185 
186     @Override
187     public ChannelFuture leaveGroup(
188             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
189             final ChannelPromise promise) {
190         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
191         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
192 
193         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
194 
195         return promise;
196     }
197 
198     @Override
199     public ChannelFuture block(
200             InetAddress multicastAddress, NetworkInterface networkInterface,
201             InetAddress sourceToBlock) {
202         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
203     }
204 
205     @Override
206     public ChannelFuture block(
207             final InetAddress multicastAddress, final NetworkInterface networkInterface,
208             final InetAddress sourceToBlock, final ChannelPromise promise) {
209         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
210         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
211         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
212         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
213         return promise;
214     }
215 
216     @Override
217     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
218         return block(multicastAddress, sourceToBlock, newPromise());
219     }
220 
221     @Override
222     public ChannelFuture block(
223             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
224         try {
225             return block(
226                     multicastAddress,
227                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
228                     sourceToBlock, promise);
229         } catch (Throwable e) {
230             promise.setFailure(e);
231         }
232         return promise;
233     }
234 
235     @Override
236     protected AbstractKQueueUnsafe newUnsafe() {
237         return new KQueueDatagramChannelUnsafe();
238     }
239 
240     @Override
241     protected void doBind(SocketAddress localAddress) throws Exception {
242         super.doBind(localAddress);
243         active = true;
244     }
245 
246     @Override
247     protected boolean doWriteMessage(Object msg) throws Exception {
248         final ByteBuf data;
249         InetSocketAddress remoteAddress;
250         if (msg instanceof AddressedEnvelope) {
251             @SuppressWarnings("unchecked")
252             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
253                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
254             data = envelope.content();
255             remoteAddress = envelope.recipient();
256         } else {
257             data = (ByteBuf) msg;
258             remoteAddress = null;
259         }
260 
261         final int dataLen = data.readableBytes();
262         if (dataLen == 0) {
263             return true;
264         }
265 
266         final long writtenBytes;
267         if (data.hasMemoryAddress()) {
268             long memoryAddress = data.memoryAddress();
269             if (remoteAddress == null) {
270                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
271             } else {
272                 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
273                         remoteAddress.getAddress(), remoteAddress.getPort());
274             }
275         } else if (data.nioBufferCount() > 1) {
276             IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
277             array.add(data, data.readerIndex(), data.readableBytes());
278             int cnt = array.count();
279             assert cnt != 0;
280 
281             if (remoteAddress == null) {
282                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
283             } else {
284                 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
285                         remoteAddress.getAddress(), remoteAddress.getPort());
286             }
287         } else {
288             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
289             if (remoteAddress == null) {
290                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
291             } else {
292                 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
293                         remoteAddress.getAddress(), remoteAddress.getPort());
294             }
295         }
296 
297         return writtenBytes > 0;
298     }
299 
300     private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
301         if (envelope.recipient() instanceof InetSocketAddress
302                 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
303             throw new UnresolvedAddressException();
304         }
305     }
306 
307     @Override
308     protected Object filterOutboundMessage(Object msg) {
309         if (msg instanceof DatagramPacket) {
310             DatagramPacket packet = (DatagramPacket) msg;
311             checkUnresolved(packet);
312             ByteBuf content = packet.content();
313             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
314                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
315         }
316 
317         if (msg instanceof ByteBuf) {
318             ByteBuf buf = (ByteBuf) msg;
319             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
320         }
321 
322         if (msg instanceof AddressedEnvelope) {
323             @SuppressWarnings("unchecked")
324             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
325             checkUnresolved(e);
326 
327             if (e.content() instanceof ByteBuf &&
328                     (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
329 
330                 ByteBuf content = (ByteBuf) e.content();
331                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
332                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
333                                 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
334             }
335         }
336 
337         throw new UnsupportedOperationException(
338                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
339     }
340 
341     @Override
342     public KQueueDatagramChannelConfig config() {
343         return config;
344     }
345 
346     @Override
347     protected void doDisconnect() throws Exception {
348         socket.disconnect();
349         connected = active = false;
350         resetCachedAddresses();
351     }
352 
353     @Override
354     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
355         if (super.doConnect(remoteAddress, localAddress)) {
356             connected = true;
357             return true;
358         }
359         return false;
360     }
361 
362     @Override
363     protected void doClose() throws Exception {
364         super.doClose();
365         connected = false;
366     }
367 
368     final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
369 
370         @Override
371         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
372             assert eventLoop().inEventLoop();
373             final DatagramChannelConfig config = config();
374             if (shouldBreakReadReady(config)) {
375                 clearReadFilter0();
376                 return;
377             }
378             final ChannelPipeline pipeline = pipeline();
379             final ByteBufAllocator allocator = config.getAllocator();
380             allocHandle.reset(config);
381             readReadyBefore();
382 
383             Throwable exception = null;
384             try {
385                 ByteBuf byteBuf = null;
386                 try {
387                     boolean connected = isConnected();
388                     do {
389                         byteBuf = allocHandle.allocate(allocator);
390                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
391 
392                         final DatagramPacket packet;
393                         if (connected) {
394                             try {
395                                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
396                             } catch (Errors.NativeIoException e) {
397                                 // We need to correctly translate connect errors to match NIO behaviour.
398                                 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
399                                     PortUnreachableException error = new PortUnreachableException(e.getMessage());
400                                     error.initCause(e);
401                                     throw error;
402                                 }
403                                 throw e;
404                             }
405                             if (allocHandle.lastBytesRead() <= 0) {
406                                 // nothing was read, release the buffer.
407                                 byteBuf.release();
408                                 byteBuf = null;
409                                 break;
410                             }
411                             packet = new DatagramPacket(byteBuf,
412                                     (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
413                         } else {
414                             final DatagramSocketAddress remoteAddress;
415                             if (byteBuf.hasMemoryAddress()) {
416                                 // has a memory address so use optimized call
417                                 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
418                                         byteBuf.capacity());
419                             } else {
420                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
421                                         byteBuf.writerIndex(), byteBuf.writableBytes());
422                                 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
423                             }
424 
425                             if (remoteAddress == null) {
426                                 allocHandle.lastBytesRead(-1);
427                                 byteBuf.release();
428                                 byteBuf = null;
429                                 break;
430                             }
431                             InetSocketAddress localAddress = remoteAddress.localAddress();
432                             if (localAddress == null) {
433                                 localAddress = (InetSocketAddress) localAddress();
434                             }
435                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
436                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
437 
438                             packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
439                         }
440 
441                         allocHandle.incMessagesRead(1);
442 
443                         readPending = false;
444                         pipeline.fireChannelRead(packet);
445 
446                         byteBuf = null;
447 
448                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
449                     // as we read anything).
450                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
451                 } catch (Throwable t) {
452                     if (byteBuf != null) {
453                         byteBuf.release();
454                     }
455                     exception = t;
456                 }
457 
458                 allocHandle.readComplete();
459                 pipeline.fireChannelReadComplete();
460 
461                 if (exception != null) {
462                     pipeline.fireExceptionCaught(exception);
463                 }
464             } finally {
465                 readReadyFinally(config);
466             }
467         }
468     }
469 }