View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.DefaultAddressedEnvelope;
25  import io.netty.channel.socket.DatagramChannel;
26  import io.netty.channel.socket.DatagramChannelConfig;
27  import io.netty.channel.socket.DatagramPacket;
28  import io.netty.channel.socket.InternetProtocolFamily;
29  import io.netty.channel.unix.DatagramSocketAddress;
30  import io.netty.channel.unix.Errors;
31  import io.netty.channel.unix.IovArray;
32  import io.netty.channel.unix.UnixChannelUtil;
33  import io.netty.util.UncheckedBooleanSupplier;
34  import io.netty.util.internal.ObjectUtil;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.UnstableApi;
37  
38  import java.net.InetAddress;
39  import java.net.InetSocketAddress;
40  import java.net.NetworkInterface;
41  import java.net.PortUnreachableException;
42  import java.net.SocketAddress;
43  import java.net.SocketException;
44  import java.nio.ByteBuffer;
45  
46  import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
47  
48  @UnstableApi
49  public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
50      private static final String EXPECTED_TYPES =
51              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
52                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
53                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
54                      StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
55                      StringUtil.simpleClassName(ByteBuf.class) + ')';
56  
57      private volatile boolean connected;
58      private final KQueueDatagramChannelConfig config;
59  
60      public KQueueDatagramChannel() {
61          super(null, newSocketDgram(), false);
62          config = new KQueueDatagramChannelConfig(this);
63      }
64  
65      public KQueueDatagramChannel(InternetProtocolFamily protocol) {
66          super(null, newSocketDgram(protocol), false);
67          config = new KQueueDatagramChannelConfig(this);
68      }
69  
70      public KQueueDatagramChannel(int fd) {
71          this(new BsdSocket(fd), true);
72      }
73  
74      KQueueDatagramChannel(BsdSocket socket, boolean active) {
75          super(null, socket, active);
76          config = new KQueueDatagramChannelConfig(this);
77      }
78  
79      @Override
80      public InetSocketAddress remoteAddress() {
81          return (InetSocketAddress) super.remoteAddress();
82      }
83  
84      @Override
85      public InetSocketAddress localAddress() {
86          return (InetSocketAddress) super.localAddress();
87      }
88  
89      @Override
90      @SuppressWarnings("deprecation")
91      public boolean isActive() {
92          return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
93      }
94  
95      @Override
96      public boolean isConnected() {
97          return connected;
98      }
99  
100     @Override
101     public ChannelFuture joinGroup(InetAddress multicastAddress) {
102         return joinGroup(multicastAddress, newPromise());
103     }
104 
105     @Override
106     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
107         try {
108             NetworkInterface iface = config().getNetworkInterface();
109             if (iface == null) {
110                 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
111             }
112             return joinGroup(multicastAddress, iface, null, promise);
113         } catch (SocketException e) {
114             promise.setFailure(e);
115         }
116         return promise;
117     }
118 
119     @Override
120     public ChannelFuture joinGroup(
121             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
122         return joinGroup(multicastAddress, networkInterface, newPromise());
123     }
124 
125     @Override
126     public ChannelFuture joinGroup(
127             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
128             ChannelPromise promise) {
129         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
130     }
131 
132     @Override
133     public ChannelFuture joinGroup(
134             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
135         return joinGroup(multicastAddress, networkInterface, source, newPromise());
136     }
137 
138     @Override
139     public ChannelFuture joinGroup(
140             final InetAddress multicastAddress, final NetworkInterface networkInterface,
141             final InetAddress source, final ChannelPromise promise) {
142 
143         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
144         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
145 
146         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
147         return promise;
148     }
149 
150     @Override
151     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
152         return leaveGroup(multicastAddress, newPromise());
153     }
154 
155     @Override
156     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
157         try {
158             return leaveGroup(
159                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
160         } catch (SocketException e) {
161             promise.setFailure(e);
162         }
163         return promise;
164     }
165 
166     @Override
167     public ChannelFuture leaveGroup(
168             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
169         return leaveGroup(multicastAddress, networkInterface, newPromise());
170     }
171 
172     @Override
173     public ChannelFuture leaveGroup(
174             InetSocketAddress multicastAddress,
175             NetworkInterface networkInterface, ChannelPromise promise) {
176         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
177     }
178 
179     @Override
180     public ChannelFuture leaveGroup(
181             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
182         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
183     }
184 
185     @Override
186     public ChannelFuture leaveGroup(
187             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
188             final ChannelPromise promise) {
189         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
190         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
191 
192         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
193 
194         return promise;
195     }
196 
197     @Override
198     public ChannelFuture block(
199             InetAddress multicastAddress, NetworkInterface networkInterface,
200             InetAddress sourceToBlock) {
201         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
202     }
203 
204     @Override
205     public ChannelFuture block(
206             final InetAddress multicastAddress, final NetworkInterface networkInterface,
207             final InetAddress sourceToBlock, final ChannelPromise promise) {
208         ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
209         ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
210         ObjectUtil.checkNotNull(networkInterface, "networkInterface");
211         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
212         return promise;
213     }
214 
215     @Override
216     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
217         return block(multicastAddress, sourceToBlock, newPromise());
218     }
219 
220     @Override
221     public ChannelFuture block(
222             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
223         try {
224             return block(
225                     multicastAddress,
226                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
227                     sourceToBlock, promise);
228         } catch (Throwable e) {
229             promise.setFailure(e);
230         }
231         return promise;
232     }
233 
234     @Override
235     protected AbstractKQueueUnsafe newUnsafe() {
236         return new KQueueDatagramChannelUnsafe();
237     }
238 
239     @Override
240     protected void doBind(SocketAddress localAddress) throws Exception {
241         super.doBind(localAddress);
242         active = true;
243     }
244 
245     @Override
246     protected boolean doWriteMessage(Object msg) throws Exception {
247         final ByteBuf data;
248         InetSocketAddress remoteAddress;
249         if (msg instanceof AddressedEnvelope) {
250             @SuppressWarnings("unchecked")
251             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
252                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
253             data = envelope.content();
254             remoteAddress = envelope.recipient();
255         } else {
256             data = (ByteBuf) msg;
257             remoteAddress = null;
258         }
259 
260         final int dataLen = data.readableBytes();
261         if (dataLen == 0) {
262             return true;
263         }
264 
265         final long writtenBytes;
266         if (data.hasMemoryAddress()) {
267             long memoryAddress = data.memoryAddress();
268             if (remoteAddress == null) {
269                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
270             } else {
271                 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
272                         remoteAddress.getAddress(), remoteAddress.getPort());
273             }
274         } else if (data.nioBufferCount() > 1) {
275             IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
276             array.add(data, data.readerIndex(), data.readableBytes());
277             int cnt = array.count();
278             assert cnt != 0;
279 
280             if (remoteAddress == null) {
281                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
282             } else {
283                 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
284                         remoteAddress.getAddress(), remoteAddress.getPort());
285             }
286         } else {
287             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
288             if (remoteAddress == null) {
289                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
290             } else {
291                 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
292                         remoteAddress.getAddress(), remoteAddress.getPort());
293             }
294         }
295 
296         return writtenBytes > 0;
297     }
298 
299     @Override
300     protected Object filterOutboundMessage(Object msg) {
301         if (msg instanceof DatagramPacket) {
302             DatagramPacket packet = (DatagramPacket) msg;
303             ByteBuf content = packet.content();
304             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
305                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
306         }
307 
308         if (msg instanceof ByteBuf) {
309             ByteBuf buf = (ByteBuf) msg;
310             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
311         }
312 
313         if (msg instanceof AddressedEnvelope) {
314             @SuppressWarnings("unchecked")
315             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
316             if (e.content() instanceof ByteBuf &&
317                     (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
318 
319                 ByteBuf content = (ByteBuf) e.content();
320                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
321                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
322                                 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
323             }
324         }
325 
326         throw new UnsupportedOperationException(
327                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
328     }
329 
330     @Override
331     public KQueueDatagramChannelConfig config() {
332         return config;
333     }
334 
335     @Override
336     protected void doDisconnect() throws Exception {
337         socket.disconnect();
338         connected = active = false;
339         resetCachedAddresses();
340     }
341 
342     @Override
343     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
344         if (super.doConnect(remoteAddress, localAddress)) {
345             connected = true;
346             return true;
347         }
348         return false;
349     }
350 
351     @Override
352     protected void doClose() throws Exception {
353         super.doClose();
354         connected = false;
355     }
356 
357     final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
358 
359         @Override
360         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
361             assert eventLoop().inEventLoop();
362             final DatagramChannelConfig config = config();
363             if (shouldBreakReadReady(config)) {
364                 clearReadFilter0();
365                 return;
366             }
367             final ChannelPipeline pipeline = pipeline();
368             final ByteBufAllocator allocator = config.getAllocator();
369             allocHandle.reset(config);
370             readReadyBefore();
371 
372             Throwable exception = null;
373             try {
374                 ByteBuf byteBuf = null;
375                 try {
376                     boolean connected = isConnected();
377                     do {
378                         byteBuf = allocHandle.allocate(allocator);
379                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
380 
381                         final DatagramPacket packet;
382                         if (connected) {
383                             try {
384                                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
385                             } catch (Errors.NativeIoException e) {
386                                 // We need to correctly translate connect errors to match NIO behaviour.
387                                 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
388                                     PortUnreachableException error = new PortUnreachableException(e.getMessage());
389                                     error.initCause(e);
390                                     throw error;
391                                 }
392                                 throw e;
393                             }
394                             if (allocHandle.lastBytesRead() <= 0) {
395                                 // nothing was read, release the buffer.
396                                 byteBuf.release();
397                                 byteBuf = null;
398                                 break;
399                             }
400                             packet = new DatagramPacket(byteBuf,
401                                     (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
402                         } else {
403                             final DatagramSocketAddress remoteAddress;
404                             if (byteBuf.hasMemoryAddress()) {
405                                 // has a memory address so use optimized call
406                                 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
407                                         byteBuf.capacity());
408                             } else {
409                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
410                                         byteBuf.writerIndex(), byteBuf.writableBytes());
411                                 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
412                             }
413 
414                             if (remoteAddress == null) {
415                                 allocHandle.lastBytesRead(-1);
416                                 byteBuf.release();
417                                 byteBuf = null;
418                                 break;
419                             }
420                             InetSocketAddress localAddress = remoteAddress.localAddress();
421                             if (localAddress == null) {
422                                 localAddress = (InetSocketAddress) localAddress();
423                             }
424                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
425                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
426 
427                             packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
428                         }
429 
430                         allocHandle.incMessagesRead(1);
431 
432                         readPending = false;
433                         pipeline.fireChannelRead(packet);
434 
435                         byteBuf = null;
436 
437                     // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
438                     // as we read anything).
439                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
440                 } catch (Throwable t) {
441                     if (byteBuf != null) {
442                         byteBuf.release();
443                     }
444                     exception = t;
445                 }
446 
447                 allocHandle.readComplete();
448                 pipeline.fireChannelReadComplete();
449 
450                 if (exception != null) {
451                     pipeline.fireExceptionCaught(exception);
452                 }
453             } finally {
454                 readReadyFinally(config);
455             }
456         }
457     }
458 }