View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultAddressedEnvelope;
27  import io.netty.channel.socket.DatagramChannel;
28  import io.netty.channel.socket.DatagramChannelConfig;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.unix.DatagramSocketAddress;
31  import io.netty.channel.unix.IovArray;
32  import io.netty.channel.unix.UnixChannelUtil;
33  import io.netty.util.internal.StringUtil;
34  
35  import java.io.IOException;
36  import java.net.InetAddress;
37  import java.net.InetSocketAddress;
38  import java.net.NetworkInterface;
39  import java.net.SocketAddress;
40  import java.net.SocketException;
41  import java.nio.ByteBuffer;
42  
43  import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
44  
45  /**
46   * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
47   * maximal performance.
48   */
49  public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
50      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
51      private static final String EXPECTED_TYPES =
52              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54              StringUtil.simpleClassName(ByteBuf.class) + ", " +
55              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56              StringUtil.simpleClassName(ByteBuf.class) + ')';
57  
58      private final EpollDatagramChannelConfig config;
59      private volatile boolean connected;
60  
61      public EpollDatagramChannel() {
62          super(newSocketDgram(), Native.EPOLLIN);
63          config = new EpollDatagramChannelConfig(this);
64      }
65  
66      public EpollDatagramChannel(int fd) {
67          this(new LinuxSocket(fd));
68      }
69  
70      EpollDatagramChannel(LinuxSocket fd) {
71          super(null, fd, Native.EPOLLIN, true);
72          config = new EpollDatagramChannelConfig(this);
73      }
74  
75      @Override
76      public InetSocketAddress remoteAddress() {
77          return (InetSocketAddress) super.remoteAddress();
78      }
79  
80      @Override
81      public InetSocketAddress localAddress() {
82          return (InetSocketAddress) super.localAddress();
83      }
84  
85      @Override
86      public ChannelMetadata metadata() {
87          return METADATA;
88      }
89  
90      @Override
91      @SuppressWarnings("deprecation")
92      public boolean isActive() {
93          return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
94      }
95  
96      @Override
97      public boolean isConnected() {
98          return connected;
99      }
100 
101     @Override
102     public ChannelFuture joinGroup(InetAddress multicastAddress) {
103         return joinGroup(multicastAddress, newPromise());
104     }
105 
106     @Override
107     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
108         try {
109             return joinGroup(
110                     multicastAddress,
111                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
112         } catch (SocketException e) {
113             promise.setFailure(e);
114         }
115         return promise;
116     }
117 
118     @Override
119     public ChannelFuture joinGroup(
120             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
121         return joinGroup(multicastAddress, networkInterface, newPromise());
122     }
123 
124     @Override
125     public ChannelFuture joinGroup(
126             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
127             ChannelPromise promise) {
128         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
129     }
130 
131     @Override
132     public ChannelFuture joinGroup(
133             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
134         return joinGroup(multicastAddress, networkInterface, source, newPromise());
135     }
136 
137     @Override
138     public ChannelFuture joinGroup(
139             final InetAddress multicastAddress, final NetworkInterface networkInterface,
140             final InetAddress source, final ChannelPromise promise) {
141 
142         if (multicastAddress == null) {
143             throw new NullPointerException("multicastAddress");
144         }
145 
146         if (networkInterface == null) {
147             throw new NullPointerException("networkInterface");
148         }
149 
150         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
151         return promise;
152     }
153 
154     @Override
155     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
156         return leaveGroup(multicastAddress, newPromise());
157     }
158 
159     @Override
160     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
161         try {
162             return leaveGroup(
163                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
164         } catch (SocketException e) {
165             promise.setFailure(e);
166         }
167         return promise;
168     }
169 
170     @Override
171     public ChannelFuture leaveGroup(
172             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
173         return leaveGroup(multicastAddress, networkInterface, newPromise());
174     }
175 
176     @Override
177     public ChannelFuture leaveGroup(
178             InetSocketAddress multicastAddress,
179             NetworkInterface networkInterface, ChannelPromise promise) {
180         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
181     }
182 
183     @Override
184     public ChannelFuture leaveGroup(
185             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
186         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
187     }
188 
189     @Override
190     public ChannelFuture leaveGroup(
191             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
192             final ChannelPromise promise) {
193         if (multicastAddress == null) {
194             throw new NullPointerException("multicastAddress");
195         }
196         if (networkInterface == null) {
197             throw new NullPointerException("networkInterface");
198         }
199 
200         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
201 
202         return promise;
203     }
204 
205     @Override
206     public ChannelFuture block(
207             InetAddress multicastAddress, NetworkInterface networkInterface,
208             InetAddress sourceToBlock) {
209         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
210     }
211 
212     @Override
213     public ChannelFuture block(
214             final InetAddress multicastAddress, final NetworkInterface networkInterface,
215             final InetAddress sourceToBlock, final ChannelPromise promise) {
216         if (multicastAddress == null) {
217             throw new NullPointerException("multicastAddress");
218         }
219         if (sourceToBlock == null) {
220             throw new NullPointerException("sourceToBlock");
221         }
222 
223         if (networkInterface == null) {
224             throw new NullPointerException("networkInterface");
225         }
226         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
227         return promise;
228     }
229 
230     @Override
231     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
232         return block(multicastAddress, sourceToBlock, newPromise());
233     }
234 
235     @Override
236     public ChannelFuture block(
237             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
238         try {
239             return block(
240                     multicastAddress,
241                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
242                     sourceToBlock, promise);
243         } catch (Throwable e) {
244             promise.setFailure(e);
245         }
246         return promise;
247     }
248 
249     @Override
250     protected AbstractEpollUnsafe newUnsafe() {
251         return new EpollDatagramChannelUnsafe();
252     }
253 
254     @Override
255     protected void doBind(SocketAddress localAddress) throws Exception {
256         super.doBind(localAddress);
257         active = true;
258     }
259 
260     @Override
261     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
262         for (;;) {
263             Object msg = in.current();
264             if (msg == null) {
265                 // Wrote all messages.
266                 clearFlag(Native.EPOLLOUT);
267                 break;
268             }
269 
270             try {
271                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
272                 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
273                     NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
274                     int cnt = array.count();
275 
276                     if (cnt >= 1) {
277                         // Try to use gathering writes via sendmmsg(...) syscall.
278                         int offset = 0;
279                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
280 
281                         while (cnt > 0) {
282                             int send = Native.sendmmsg(socket.intValue(), packets, offset, cnt);
283                             if (send == 0) {
284                                 // Did not write all messages.
285                                 setFlag(Native.EPOLLOUT);
286                                 return;
287                             }
288                             for (int i = 0; i < send; i++) {
289                                 in.remove();
290                             }
291                             cnt -= send;
292                             offset += send;
293                         }
294                         continue;
295                     }
296                 }
297                 boolean done = false;
298                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
299                     if (doWriteMessage(msg)) {
300                         done = true;
301                         break;
302                     }
303                 }
304 
305                 if (done) {
306                     in.remove();
307                 } else {
308                     // Did not write all messages.
309                     setFlag(Native.EPOLLOUT);
310                     break;
311                 }
312             } catch (IOException e) {
313                 // Continue on write error as a DatagramChannel can write to multiple remote peers
314                 //
315                 // See https://github.com/netty/netty/issues/2665
316                 in.remove(e);
317             }
318         }
319     }
320 
321     private boolean doWriteMessage(Object msg) throws Exception {
322         final ByteBuf data;
323         InetSocketAddress remoteAddress;
324         if (msg instanceof AddressedEnvelope) {
325             @SuppressWarnings("unchecked")
326             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
327                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
328             data = envelope.content();
329             remoteAddress = envelope.recipient();
330         } else {
331             data = (ByteBuf) msg;
332             remoteAddress = null;
333         }
334 
335         final int dataLen = data.readableBytes();
336         if (dataLen == 0) {
337             return true;
338         }
339 
340         final long writtenBytes;
341         if (data.hasMemoryAddress()) {
342             long memoryAddress = data.memoryAddress();
343             if (remoteAddress == null) {
344                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
345             } else {
346                 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
347                         remoteAddress.getAddress(), remoteAddress.getPort());
348             }
349         } else if (data.nioBufferCount() > 1) {
350             IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
351             array.add(data);
352             int cnt = array.count();
353             assert cnt != 0;
354 
355             if (remoteAddress == null) {
356                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
357             } else {
358                 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
359                         remoteAddress.getAddress(), remoteAddress.getPort());
360             }
361         } else  {
362             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
363             if (remoteAddress == null) {
364                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
365             } else {
366                 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
367                         remoteAddress.getAddress(), remoteAddress.getPort());
368             }
369         }
370 
371         return writtenBytes > 0;
372     }
373 
374     @Override
375     protected Object filterOutboundMessage(Object msg) {
376         if (msg instanceof DatagramPacket) {
377             DatagramPacket packet = (DatagramPacket) msg;
378             ByteBuf content = packet.content();
379             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
380                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
381         }
382 
383         if (msg instanceof ByteBuf) {
384             ByteBuf buf = (ByteBuf) msg;
385             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
386         }
387 
388         if (msg instanceof AddressedEnvelope) {
389             @SuppressWarnings("unchecked")
390             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
391             if (e.content() instanceof ByteBuf &&
392                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
393 
394                 ByteBuf content = (ByteBuf) e.content();
395                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
396                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
397                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
398             }
399         }
400 
401         throw new UnsupportedOperationException(
402                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
403     }
404 
405     @Override
406     public EpollDatagramChannelConfig config() {
407         return config;
408     }
409 
410     @Override
411     protected void doDisconnect() throws Exception {
412         socket.disconnect();
413         connected = active = false;
414     }
415 
416     @Override
417     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
418         if (super.doConnect(remoteAddress, localAddress)) {
419             connected = true;
420             return true;
421         }
422         return false;
423     }
424 
425     @Override
426     protected void doClose() throws Exception {
427         super.doClose();
428         connected = false;
429     }
430 
431     final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
432 
433         @Override
434         void epollInReady() {
435             assert eventLoop().inEventLoop();
436             DatagramChannelConfig config = config();
437             if (shouldBreakEpollInReady(config)) {
438                 clearEpollIn0();
439                 return;
440             }
441             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
442             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
443 
444             final ChannelPipeline pipeline = pipeline();
445             final ByteBufAllocator allocator = config.getAllocator();
446             allocHandle.reset(config);
447             epollInBefore();
448 
449             Throwable exception = null;
450             try {
451                 ByteBuf data = null;
452                 try {
453                     do {
454                         data = allocHandle.allocate(allocator);
455                         allocHandle.attemptedBytesRead(data.writableBytes());
456                         final DatagramSocketAddress remoteAddress;
457                         if (data.hasMemoryAddress()) {
458                             // has a memory address so use optimized call
459                             remoteAddress = socket.recvFromAddress(data.memoryAddress(), data.writerIndex(),
460                                                                  data.capacity());
461                         } else {
462                             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
463                             remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
464                         }
465 
466                         if (remoteAddress == null) {
467                             allocHandle.lastBytesRead(-1);
468                             data.release();
469                             data = null;
470                             break;
471                         }
472 
473                         allocHandle.incMessagesRead(1);
474                         allocHandle.lastBytesRead(remoteAddress.receivedAmount());
475                         data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
476 
477                         readPending = false;
478                         pipeline.fireChannelRead(
479                                 new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
480 
481                         data = null;
482                     } while (allocHandle.continueReading());
483                 } catch (Throwable t) {
484                     if (data != null) {
485                         data.release();
486                     }
487                     exception = t;
488                 }
489 
490                 allocHandle.readComplete();
491                 pipeline.fireChannelReadComplete();
492 
493                 if (exception != null) {
494                     pipeline.fireExceptionCaught(exception);
495                 }
496             } finally {
497                 epollInFinally(config);
498             }
499         }
500     }
501 }