View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultAddressedEnvelope;
27  import io.netty.channel.socket.DatagramChannel;
28  import io.netty.channel.socket.DatagramChannelConfig;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.unix.DatagramSocketAddress;
31  import io.netty.channel.unix.IovArray;
32  import io.netty.channel.unix.UnixChannelUtil;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.UnstableApi;
35  
36  import java.io.IOException;
37  import java.net.InetAddress;
38  import java.net.InetSocketAddress;
39  import java.net.NetworkInterface;
40  import java.net.SocketAddress;
41  import java.net.SocketException;
42  import java.nio.ByteBuffer;
43  import java.util.ArrayList;
44  import java.util.List;
45  
46  import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
47  
48  @UnstableApi
49  public final class KQueueDatagramChannel extends AbstractKQueueChannel implements DatagramChannel {
50      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
51      private static final String EXPECTED_TYPES =
52              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
55                      StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56                      StringUtil.simpleClassName(ByteBuf.class) + ')';
57  
58      private volatile boolean connected;
59      private final KQueueDatagramChannelConfig config;
60  
61      public KQueueDatagramChannel() {
62          super(null, newSocketDgram(), false);
63          config = new KQueueDatagramChannelConfig(this);
64      }
65  
66      public KQueueDatagramChannel(int fd) {
67          this(new BsdSocket(fd), true);
68      }
69  
70      KQueueDatagramChannel(BsdSocket socket, boolean active) {
71          super(null, socket, active);
72          config = new KQueueDatagramChannelConfig(this);
73      }
74  
75      @Override
76      public InetSocketAddress remoteAddress() {
77          return (InetSocketAddress) super.remoteAddress();
78      }
79  
80      @Override
81      public InetSocketAddress localAddress() {
82          return (InetSocketAddress) super.localAddress();
83      }
84  
85      @Override
86      public ChannelMetadata metadata() {
87          return METADATA;
88      }
89  
90      @Override
91      @SuppressWarnings("deprecation")
92      public boolean isActive() {
93          return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
94      }
95  
96      @Override
97      public boolean isConnected() {
98          return connected;
99      }
100 
101     @Override
102     public ChannelFuture joinGroup(InetAddress multicastAddress) {
103         return joinGroup(multicastAddress, newPromise());
104     }
105 
106     @Override
107     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
108         try {
109             return joinGroup(
110                     multicastAddress,
111                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
112                     null, promise);
113         } catch (SocketException e) {
114             promise.setFailure(e);
115         }
116         return promise;
117     }
118 
119     @Override
120     public ChannelFuture joinGroup(
121             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
122         return joinGroup(multicastAddress, networkInterface, newPromise());
123     }
124 
125     @Override
126     public ChannelFuture joinGroup(
127             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
128             ChannelPromise promise) {
129         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
130     }
131 
132     @Override
133     public ChannelFuture joinGroup(
134             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
135         return joinGroup(multicastAddress, networkInterface, source, newPromise());
136     }
137 
138     @Override
139     public ChannelFuture joinGroup(
140             final InetAddress multicastAddress, final NetworkInterface networkInterface,
141             final InetAddress source, final ChannelPromise promise) {
142 
143         if (multicastAddress == null) {
144             throw new NullPointerException("multicastAddress");
145         }
146 
147         if (networkInterface == null) {
148             throw new NullPointerException("networkInterface");
149         }
150 
151         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
152         return promise;
153     }
154 
155     @Override
156     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
157         return leaveGroup(multicastAddress, newPromise());
158     }
159 
160     @Override
161     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
162         try {
163             return leaveGroup(
164                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
165         } catch (SocketException e) {
166             promise.setFailure(e);
167         }
168         return promise;
169     }
170 
171     @Override
172     public ChannelFuture leaveGroup(
173             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
174         return leaveGroup(multicastAddress, networkInterface, newPromise());
175     }
176 
177     @Override
178     public ChannelFuture leaveGroup(
179             InetSocketAddress multicastAddress,
180             NetworkInterface networkInterface, ChannelPromise promise) {
181         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
182     }
183 
184     @Override
185     public ChannelFuture leaveGroup(
186             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
187         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
188     }
189 
190     @Override
191     public ChannelFuture leaveGroup(
192             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
193             final ChannelPromise promise) {
194         if (multicastAddress == null) {
195             throw new NullPointerException("multicastAddress");
196         }
197         if (networkInterface == null) {
198             throw new NullPointerException("networkInterface");
199         }
200 
201         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
202 
203         return promise;
204     }
205 
206     @Override
207     public ChannelFuture block(
208             InetAddress multicastAddress, NetworkInterface networkInterface,
209             InetAddress sourceToBlock) {
210         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
211     }
212 
213     @Override
214     public ChannelFuture block(
215             final InetAddress multicastAddress, final NetworkInterface networkInterface,
216             final InetAddress sourceToBlock, final ChannelPromise promise) {
217         if (multicastAddress == null) {
218             throw new NullPointerException("multicastAddress");
219         }
220         if (sourceToBlock == null) {
221             throw new NullPointerException("sourceToBlock");
222         }
223 
224         if (networkInterface == null) {
225             throw new NullPointerException("networkInterface");
226         }
227         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
228         return promise;
229     }
230 
231     @Override
232     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
233         return block(multicastAddress, sourceToBlock, newPromise());
234     }
235 
236     @Override
237     public ChannelFuture block(
238             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
239         try {
240             return block(
241                     multicastAddress,
242                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
243                     sourceToBlock, promise);
244         } catch (Throwable e) {
245             promise.setFailure(e);
246         }
247         return promise;
248     }
249 
250     @Override
251     protected AbstractKQueueUnsafe newUnsafe() {
252         return new KQueueDatagramChannelUnsafe();
253     }
254 
255     @Override
256     protected void doBind(SocketAddress localAddress) throws Exception {
257         super.doBind(localAddress);
258         active = true;
259     }
260 
261     @Override
262     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
263         for (;;) {
264             Object msg = in.current();
265             if (msg == null) {
266                 // Wrote all messages.
267                 writeFilter(false);
268                 break;
269             }
270 
271             try {
272                 boolean done = false;
273                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
274                     if (doWriteMessage(msg)) {
275                         done = true;
276                         break;
277                     }
278                 }
279 
280                 if (done) {
281                     in.remove();
282                 } else {
283                     // Did not write all messages.
284                     writeFilter(true);
285                     break;
286                 }
287             } catch (IOException e) {
288                 // Continue on write error as a DatagramChannel can write to multiple remote peers
289                 //
290                 // See https://github.com/netty/netty/issues/2665
291                 in.remove(e);
292             }
293         }
294     }
295 
296     private boolean doWriteMessage(Object msg) throws Exception {
297         final ByteBuf data;
298         InetSocketAddress remoteAddress;
299         if (msg instanceof AddressedEnvelope) {
300             @SuppressWarnings("unchecked")
301             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
302                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
303             data = envelope.content();
304             remoteAddress = envelope.recipient();
305         } else {
306             data = (ByteBuf) msg;
307             remoteAddress = null;
308         }
309 
310         final int dataLen = data.readableBytes();
311         if (dataLen == 0) {
312             return true;
313         }
314 
315         final long writtenBytes;
316         if (data.hasMemoryAddress()) {
317             long memoryAddress = data.memoryAddress();
318             if (remoteAddress == null) {
319                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
320             } else {
321                 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
322                         remoteAddress.getAddress(), remoteAddress.getPort());
323             }
324         } else if (data.nioBufferCount() > 1) {
325             IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
326             array.add(data);
327             int cnt = array.count();
328             assert cnt != 0;
329 
330             if (remoteAddress == null) {
331                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
332             } else {
333                 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
334                         remoteAddress.getAddress(), remoteAddress.getPort());
335             }
336         } else  {
337             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
338             if (remoteAddress == null) {
339                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
340             } else {
341                 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
342                         remoteAddress.getAddress(), remoteAddress.getPort());
343             }
344         }
345 
346         return writtenBytes > 0;
347     }
348 
349     @Override
350     protected Object filterOutboundMessage(Object msg) {
351         if (msg instanceof DatagramPacket) {
352             DatagramPacket packet = (DatagramPacket) msg;
353             ByteBuf content = packet.content();
354             return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
355                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
356         }
357 
358         if (msg instanceof ByteBuf) {
359             ByteBuf buf = (ByteBuf) msg;
360             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
361         }
362 
363         if (msg instanceof AddressedEnvelope) {
364             @SuppressWarnings("unchecked")
365             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
366             if (e.content() instanceof ByteBuf &&
367                     (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
368 
369                 ByteBuf content = (ByteBuf) e.content();
370                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
371                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
372                                 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
373             }
374         }
375 
376         throw new UnsupportedOperationException(
377                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
378     }
379 
380     @Override
381     public KQueueDatagramChannelConfig config() {
382         return config;
383     }
384 
385     @Override
386     protected void doDisconnect() throws Exception {
387         socket.disconnect();
388         connected = active = false;
389     }
390 
391     @Override
392     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
393         if (super.doConnect(remoteAddress, localAddress)) {
394             connected = true;
395             return true;
396         }
397         return false;
398     }
399 
400     @Override
401     protected void doClose() throws Exception {
402         super.doClose();
403         connected = false;
404     }
405 
406     final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
407 
408         @Override
409         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
410             assert eventLoop().inEventLoop();
411             final DatagramChannelConfig config = config();
412             if (shouldBreakReadReady(config)) {
413                 clearReadFilter0();
414                 return;
415             }
416             final ChannelPipeline pipeline = pipeline();
417             final ByteBufAllocator allocator = config.getAllocator();
418             allocHandle.reset(config);
419             readReadyBefore();
420 
421             Throwable exception = null;
422             try {
423                 ByteBuf data = null;
424                 try {
425                     do {
426                         data = allocHandle.allocate(allocator);
427                         allocHandle.attemptedBytesRead(data.writableBytes());
428                         final DatagramSocketAddress remoteAddress;
429                         if (data.hasMemoryAddress()) {
430                             // has a memory address so use optimized call
431                             remoteAddress = socket.recvFromAddress(data.memoryAddress(), data.writerIndex(),
432                                     data.capacity());
433                         } else {
434                             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
435                             remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
436                         }
437 
438                         if (remoteAddress == null) {
439                             allocHandle.lastBytesRead(-1);
440                             data.release();
441                             data = null;
442                             break;
443                         }
444 
445                         allocHandle.incMessagesRead(1);
446                         allocHandle.lastBytesRead(remoteAddress.receivedAmount());
447                         data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
448 
449                         readPending = false;
450                         pipeline.fireChannelRead(
451                                 new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
452 
453                         data = null;
454                     } while (allocHandle.continueReading());
455                 } catch (Throwable t) {
456                     if (data != null) {
457                         data.release();
458                     }
459                     exception = t;
460                 }
461 
462                 allocHandle.readComplete();
463                 pipeline.fireChannelReadComplete();
464 
465                 if (exception != null) {
466                     pipeline.fireExceptionCaught(exception);
467                 }
468             } finally {
469                 readReadyFinally(config);
470             }
471         }
472     }
473 }