View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultAddressedEnvelope;
27  import io.netty.channel.socket.DatagramChannel;
28  import io.netty.channel.socket.DatagramChannelConfig;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.unix.DatagramSocketAddress;
31  import io.netty.channel.unix.Errors;
32  import io.netty.channel.unix.IovArray;
33  import io.netty.channel.unix.UnixChannelUtil;
34  import io.netty.util.internal.StringUtil;
35  
36  import java.io.IOException;
37  import java.net.InetAddress;
38  import java.net.InetSocketAddress;
39  import java.net.NetworkInterface;
40  import java.net.PortUnreachableException;
41  import java.net.SocketAddress;
42  import java.net.SocketException;
43  import java.nio.ByteBuffer;
44  
45  import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
46  
47  /**
48   * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
49   * maximal performance.
50   */
51  public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
52      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
53      private static final String EXPECTED_TYPES =
54              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
55              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
56              StringUtil.simpleClassName(ByteBuf.class) + ", " +
57              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
58              StringUtil.simpleClassName(ByteBuf.class) + ')';
59  
60      private final EpollDatagramChannelConfig config;
61      private volatile boolean connected;
62  
63      public EpollDatagramChannel() {
64          super(newSocketDgram());
65          config = new EpollDatagramChannelConfig(this);
66      }
67  
68      public EpollDatagramChannel(int fd) {
69          this(new LinuxSocket(fd));
70      }
71  
72      EpollDatagramChannel(LinuxSocket fd) {
73          super(null, fd, true);
74          config = new EpollDatagramChannelConfig(this);
75      }
76  
77      @Override
78      public InetSocketAddress remoteAddress() {
79          return (InetSocketAddress) super.remoteAddress();
80      }
81  
82      @Override
83      public InetSocketAddress localAddress() {
84          return (InetSocketAddress) super.localAddress();
85      }
86  
87      @Override
88      public ChannelMetadata metadata() {
89          return METADATA;
90      }
91  
92      @Override
93      @SuppressWarnings("deprecation")
94      public boolean isActive() {
95          return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
96      }
97  
98      @Override
99      public boolean isConnected() {
100         return connected;
101     }
102 
103     @Override
104     public ChannelFuture joinGroup(InetAddress multicastAddress) {
105         return joinGroup(multicastAddress, newPromise());
106     }
107 
108     @Override
109     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
110         try {
111             return joinGroup(
112                     multicastAddress,
113                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
114         } catch (SocketException e) {
115             promise.setFailure(e);
116         }
117         return promise;
118     }
119 
120     @Override
121     public ChannelFuture joinGroup(
122             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
123         return joinGroup(multicastAddress, networkInterface, newPromise());
124     }
125 
126     @Override
127     public ChannelFuture joinGroup(
128             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
129             ChannelPromise promise) {
130         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
131     }
132 
133     @Override
134     public ChannelFuture joinGroup(
135             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
136         return joinGroup(multicastAddress, networkInterface, source, newPromise());
137     }
138 
139     @Override
140     public ChannelFuture joinGroup(
141             final InetAddress multicastAddress, final NetworkInterface networkInterface,
142             final InetAddress source, final ChannelPromise promise) {
143 
144         if (multicastAddress == null) {
145             throw new NullPointerException("multicastAddress");
146         }
147 
148         if (networkInterface == null) {
149             throw new NullPointerException("networkInterface");
150         }
151 
152         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
153         return promise;
154     }
155 
156     @Override
157     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
158         return leaveGroup(multicastAddress, newPromise());
159     }
160 
161     @Override
162     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
163         try {
164             return leaveGroup(
165                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
166         } catch (SocketException e) {
167             promise.setFailure(e);
168         }
169         return promise;
170     }
171 
172     @Override
173     public ChannelFuture leaveGroup(
174             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
175         return leaveGroup(multicastAddress, networkInterface, newPromise());
176     }
177 
178     @Override
179     public ChannelFuture leaveGroup(
180             InetSocketAddress multicastAddress,
181             NetworkInterface networkInterface, ChannelPromise promise) {
182         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
183     }
184 
185     @Override
186     public ChannelFuture leaveGroup(
187             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
188         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
189     }
190 
191     @Override
192     public ChannelFuture leaveGroup(
193             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
194             final ChannelPromise promise) {
195         if (multicastAddress == null) {
196             throw new NullPointerException("multicastAddress");
197         }
198         if (networkInterface == null) {
199             throw new NullPointerException("networkInterface");
200         }
201 
202         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
203 
204         return promise;
205     }
206 
207     @Override
208     public ChannelFuture block(
209             InetAddress multicastAddress, NetworkInterface networkInterface,
210             InetAddress sourceToBlock) {
211         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
212     }
213 
214     @Override
215     public ChannelFuture block(
216             final InetAddress multicastAddress, final NetworkInterface networkInterface,
217             final InetAddress sourceToBlock, final ChannelPromise promise) {
218         if (multicastAddress == null) {
219             throw new NullPointerException("multicastAddress");
220         }
221         if (sourceToBlock == null) {
222             throw new NullPointerException("sourceToBlock");
223         }
224 
225         if (networkInterface == null) {
226             throw new NullPointerException("networkInterface");
227         }
228         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
229         return promise;
230     }
231 
232     @Override
233     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
234         return block(multicastAddress, sourceToBlock, newPromise());
235     }
236 
237     @Override
238     public ChannelFuture block(
239             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
240         try {
241             return block(
242                     multicastAddress,
243                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
244                     sourceToBlock, promise);
245         } catch (Throwable e) {
246             promise.setFailure(e);
247         }
248         return promise;
249     }
250 
251     @Override
252     protected AbstractEpollUnsafe newUnsafe() {
253         return new EpollDatagramChannelUnsafe();
254     }
255 
256     @Override
257     protected void doBind(SocketAddress localAddress) throws Exception {
258         super.doBind(localAddress);
259         active = true;
260     }
261 
262     @Override
263     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
264         for (;;) {
265             Object msg = in.current();
266             if (msg == null) {
267                 // Wrote all messages.
268                 clearFlag(Native.EPOLLOUT);
269                 break;
270             }
271 
272             try {
273                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
274                 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
275                     NativeDatagramPacketArray array = ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray();
276                     in.forEachFlushedMessage(array);
277                     int cnt = array.count();
278 
279                     if (cnt >= 1) {
280                         // Try to use gathering writes via sendmmsg(...) syscall.
281                         int offset = 0;
282                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
283 
284                         while (cnt > 0) {
285                             int send = Native.sendmmsg(socket.intValue(), packets, offset, cnt);
286                             if (send == 0) {
287                                 // Did not write all messages.
288                                 setFlag(Native.EPOLLOUT);
289                                 return;
290                             }
291                             for (int i = 0; i < send; i++) {
292                                 in.remove();
293                             }
294                             cnt -= send;
295                             offset += send;
296                         }
297                         continue;
298                     }
299                 }
300                 boolean done = false;
301                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
302                     if (doWriteMessage(msg)) {
303                         done = true;
304                         break;
305                     }
306                 }
307 
308                 if (done) {
309                     in.remove();
310                 } else {
311                     // Did not write all messages.
312                     setFlag(Native.EPOLLOUT);
313                     break;
314                 }
315             } catch (IOException e) {
316                 // Continue on write error as a DatagramChannel can write to multiple remote peers
317                 //
318                 // See https://github.com/netty/netty/issues/2665
319                 in.remove(e);
320             }
321         }
322     }
323 
324     private boolean doWriteMessage(Object msg) throws Exception {
325         final ByteBuf data;
326         InetSocketAddress remoteAddress;
327         if (msg instanceof AddressedEnvelope) {
328             @SuppressWarnings("unchecked")
329             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
330                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
331             data = envelope.content();
332             remoteAddress = envelope.recipient();
333         } else {
334             data = (ByteBuf) msg;
335             remoteAddress = null;
336         }
337 
338         final int dataLen = data.readableBytes();
339         if (dataLen == 0) {
340             return true;
341         }
342 
343         final long writtenBytes;
344         if (data.hasMemoryAddress()) {
345             long memoryAddress = data.memoryAddress();
346             if (remoteAddress == null) {
347                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
348             } else {
349                 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
350                         remoteAddress.getAddress(), remoteAddress.getPort());
351             }
352         } else if (data.nioBufferCount() > 1) {
353             IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
354             array.add(data);
355             int cnt = array.count();
356             assert cnt != 0;
357 
358             if (remoteAddress == null) {
359                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
360             } else {
361                 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
362                         remoteAddress.getAddress(), remoteAddress.getPort());
363             }
364         } else  {
365             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
366             if (remoteAddress == null) {
367                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
368             } else {
369                 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
370                         remoteAddress.getAddress(), remoteAddress.getPort());
371             }
372         }
373 
374         return writtenBytes > 0;
375     }
376 
377     @Override
378     protected Object filterOutboundMessage(Object msg) {
379         if (msg instanceof DatagramPacket) {
380             DatagramPacket packet = (DatagramPacket) msg;
381             ByteBuf content = packet.content();
382             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
383                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
384         }
385 
386         if (msg instanceof ByteBuf) {
387             ByteBuf buf = (ByteBuf) msg;
388             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
389         }
390 
391         if (msg instanceof AddressedEnvelope) {
392             @SuppressWarnings("unchecked")
393             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
394             if (e.content() instanceof ByteBuf &&
395                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
396 
397                 ByteBuf content = (ByteBuf) e.content();
398                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
399                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
400                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
401             }
402         }
403 
404         throw new UnsupportedOperationException(
405                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
406     }
407 
408     @Override
409     public EpollDatagramChannelConfig config() {
410         return config;
411     }
412 
413     @Override
414     protected void doDisconnect() throws Exception {
415         socket.disconnect();
416         connected = active = false;
417     }
418 
419     @Override
420     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
421         if (super.doConnect(remoteAddress, localAddress)) {
422             connected = true;
423             return true;
424         }
425         return false;
426     }
427 
428     @Override
429     protected void doClose() throws Exception {
430         super.doClose();
431         connected = false;
432     }
433 
434     final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
435 
436         @Override
437         void epollInReady() {
438             assert eventLoop().inEventLoop();
439             DatagramChannelConfig config = config();
440             if (shouldBreakEpollInReady(config)) {
441                 clearEpollIn0();
442                 return;
443             }
444             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
445             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
446 
447             final ChannelPipeline pipeline = pipeline();
448             final ByteBufAllocator allocator = config.getAllocator();
449             allocHandle.reset(config);
450             epollInBefore();
451 
452             Throwable exception = null;
453             try {
454                 ByteBuf byteBuf = null;
455                 try {
456                     boolean connected = isConnected();
457                     do {
458                         byteBuf = allocHandle.allocate(allocator);
459                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
460 
461                         final DatagramPacket packet;
462                         if (connected) {
463                             try {
464                                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
465                             } catch (Errors.NativeIoException e) {
466                                 // We need to correctly translate connect errors to match NIO behaviour.
467                                 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
468                                     PortUnreachableException error = new PortUnreachableException(e.getMessage());
469                                     error.initCause(e);
470                                     throw error;
471                                 }
472                                 throw e;
473                             }
474                             if (allocHandle.lastBytesRead() <= 0) {
475                                 // nothing was read, release the buffer.
476                                 byteBuf.release();
477                                 byteBuf = null;
478                                 break;
479                             }
480                             packet = new DatagramPacket(
481                                     byteBuf, (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
482                         } else {
483                             final DatagramSocketAddress remoteAddress;
484                             if (byteBuf.hasMemoryAddress()) {
485                                 // has a memory address so use optimized call
486                                 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
487                                         byteBuf.capacity());
488                             } else {
489                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
490                                         byteBuf.writerIndex(), byteBuf.writableBytes());
491                                 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
492                             }
493 
494                             if (remoteAddress == null) {
495                                 allocHandle.lastBytesRead(-1);
496                                 byteBuf.release();
497                                 byteBuf = null;
498                                 break;
499                             }
500                             InetSocketAddress localAddress = remoteAddress.localAddress();
501                             if (localAddress == null) {
502                                 localAddress = (InetSocketAddress) localAddress();
503                             }
504                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
505                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
506 
507                             packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
508                         }
509 
510                         allocHandle.incMessagesRead(1);
511 
512                         readPending = false;
513                         pipeline.fireChannelRead(packet);
514 
515                         byteBuf = null;
516                     } while (allocHandle.continueReading());
517                 } catch (Throwable t) {
518                     if (byteBuf != null) {
519                         byteBuf.release();
520                     }
521                     exception = t;
522                 }
523 
524                 allocHandle.readComplete();
525                 pipeline.fireChannelReadComplete();
526 
527                 if (exception != null) {
528                     pipeline.fireExceptionCaught(exception);
529                 }
530             } finally {
531                 epollInFinally(config);
532             }
533         }
534     }
535 }