View Javadoc

1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AddressedEnvelope;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultAddressedEnvelope;
26  import io.netty.channel.RecvByteBufAllocator;
27  import io.netty.channel.socket.DatagramChannel;
28  import io.netty.channel.socket.DatagramChannelConfig;
29  import io.netty.channel.socket.DatagramPacket;
30  import io.netty.channel.unix.DatagramSocketAddress;
31  import io.netty.channel.unix.FileDescriptor;
32  import io.netty.channel.unix.Socket;
33  import io.netty.util.internal.StringUtil;
34  
35  import java.io.IOException;
36  import java.net.InetAddress;
37  import java.net.InetSocketAddress;
38  import java.net.NetworkInterface;
39  import java.net.SocketAddress;
40  import java.net.SocketException;
41  import java.nio.ByteBuffer;
42  
43  import static io.netty.channel.unix.Socket.newSocketDgram;
44  
45  /**
46   * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
47   * maximal performance.
48   */
49  public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
50      private static final ChannelMetadata METADATA = new ChannelMetadata(true);
51      private static final String EXPECTED_TYPES =
52              " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53              StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54              StringUtil.simpleClassName(ByteBuf.class) + ", " +
55              StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56              StringUtil.simpleClassName(ByteBuf.class) + ')';
57  
58      private final EpollDatagramChannelConfig config;
59      private volatile boolean connected;
60  
61      public EpollDatagramChannel() {
62          super(newSocketDgram(), Native.EPOLLIN);
63          config = new EpollDatagramChannelConfig(this);
64      }
65  
66      /**
67       * @deprecated Use {@link #EpollDatagramChannel(Socket)}.
68       */
69      @Deprecated
70      public EpollDatagramChannel(FileDescriptor fd) {
71          this(new Socket(fd.intValue()));
72      }
73  
74      public EpollDatagramChannel(Socket fd) {
75          super(null, fd, Native.EPOLLIN, true);
76          config = new EpollDatagramChannelConfig(this);
77      }
78  
79      @Override
80      public InetSocketAddress remoteAddress() {
81          return (InetSocketAddress) super.remoteAddress();
82      }
83  
84      @Override
85      public InetSocketAddress localAddress() {
86          return (InetSocketAddress) super.localAddress();
87      }
88  
89      @Override
90      public ChannelMetadata metadata() {
91          return METADATA;
92      }
93  
94      @Override
95      @SuppressWarnings("deprecation")
96      public boolean isActive() {
97          return fd().isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
98      }
99  
100     @Override
101     public boolean isConnected() {
102         return connected;
103     }
104 
105     @Override
106     public ChannelFuture joinGroup(InetAddress multicastAddress) {
107         return joinGroup(multicastAddress, newPromise());
108     }
109 
110     @Override
111     public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
112         try {
113             return joinGroup(
114                     multicastAddress,
115                     NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
116         } catch (SocketException e) {
117             promise.setFailure(e);
118         }
119         return promise;
120     }
121 
122     @Override
123     public ChannelFuture joinGroup(
124             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
125         return joinGroup(multicastAddress, networkInterface, newPromise());
126     }
127 
128     @Override
129     public ChannelFuture joinGroup(
130             InetSocketAddress multicastAddress, NetworkInterface networkInterface,
131             ChannelPromise promise) {
132         return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
133     }
134 
135     @Override
136     public ChannelFuture joinGroup(
137             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
138         return joinGroup(multicastAddress, networkInterface, source, newPromise());
139     }
140 
141     @Override
142     public ChannelFuture joinGroup(
143             final InetAddress multicastAddress, final NetworkInterface networkInterface,
144             final InetAddress source, final ChannelPromise promise) {
145 
146         if (multicastAddress == null) {
147             throw new NullPointerException("multicastAddress");
148         }
149 
150         if (networkInterface == null) {
151             throw new NullPointerException("networkInterface");
152         }
153 
154         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
155         return promise;
156     }
157 
158     @Override
159     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
160         return leaveGroup(multicastAddress, newPromise());
161     }
162 
163     @Override
164     public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
165         try {
166             return leaveGroup(
167                     multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
168         } catch (SocketException e) {
169             promise.setFailure(e);
170         }
171         return promise;
172     }
173 
174     @Override
175     public ChannelFuture leaveGroup(
176             InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
177         return leaveGroup(multicastAddress, networkInterface, newPromise());
178     }
179 
180     @Override
181     public ChannelFuture leaveGroup(
182             InetSocketAddress multicastAddress,
183             NetworkInterface networkInterface, ChannelPromise promise) {
184         return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
185     }
186 
187     @Override
188     public ChannelFuture leaveGroup(
189             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
190         return leaveGroup(multicastAddress, networkInterface, source, newPromise());
191     }
192 
193     @Override
194     public ChannelFuture leaveGroup(
195             final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
196             final ChannelPromise promise) {
197         if (multicastAddress == null) {
198             throw new NullPointerException("multicastAddress");
199         }
200         if (networkInterface == null) {
201             throw new NullPointerException("networkInterface");
202         }
203 
204         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
205 
206         return promise;
207     }
208 
209     @Override
210     public ChannelFuture block(
211             InetAddress multicastAddress, NetworkInterface networkInterface,
212             InetAddress sourceToBlock) {
213         return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
214     }
215 
216     @Override
217     public ChannelFuture block(
218             final InetAddress multicastAddress, final NetworkInterface networkInterface,
219             final InetAddress sourceToBlock, final ChannelPromise promise) {
220         if (multicastAddress == null) {
221             throw new NullPointerException("multicastAddress");
222         }
223         if (sourceToBlock == null) {
224             throw new NullPointerException("sourceToBlock");
225         }
226 
227         if (networkInterface == null) {
228             throw new NullPointerException("networkInterface");
229         }
230         promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
231         return promise;
232     }
233 
234     @Override
235     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
236         return block(multicastAddress, sourceToBlock, newPromise());
237     }
238 
239     @Override
240     public ChannelFuture block(
241             InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
242         try {
243             return block(
244                     multicastAddress,
245                     NetworkInterface.getByInetAddress(localAddress().getAddress()),
246                     sourceToBlock, promise);
247         } catch (Throwable e) {
248             promise.setFailure(e);
249         }
250         return promise;
251     }
252 
253     @Override
254     protected AbstractEpollUnsafe newUnsafe() {
255         return new EpollDatagramChannelUnsafe();
256     }
257 
258     @Override
259     protected void doBind(SocketAddress localAddress) throws Exception {
260         super.doBind(localAddress);
261         active = true;
262     }
263 
264     @Override
265     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
266         for (;;) {
267             Object msg = in.current();
268             if (msg == null) {
269                 // Wrote all messages.
270                 clearFlag(Native.EPOLLOUT);
271                 break;
272             }
273 
274             try {
275                 // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
276                 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
277                     NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
278                     int cnt = array.count();
279 
280                     if (cnt >= 1) {
281                         // Try to use gathering writes via sendmmsg(...) syscall.
282                         int offset = 0;
283                         NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
284 
285                         while (cnt > 0) {
286                             int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
287                             if (send == 0) {
288                                 // Did not write all messages.
289                                 setFlag(Native.EPOLLOUT);
290                                 return;
291                             }
292                             for (int i = 0; i < send; i++) {
293                                 in.remove();
294                             }
295                             cnt -= send;
296                             offset += send;
297                         }
298                         continue;
299                     }
300                 }
301                 boolean done = false;
302                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
303                     if (doWriteMessage(msg)) {
304                         done = true;
305                         break;
306                     }
307                 }
308 
309                 if (done) {
310                     in.remove();
311                 } else {
312                     // Did not write all messages.
313                     setFlag(Native.EPOLLOUT);
314                     break;
315                 }
316             } catch (IOException e) {
317                 // Continue on write error as a DatagramChannel can write to multiple remote peers
318                 //
319                 // See https://github.com/netty/netty/issues/2665
320                 in.remove(e);
321             }
322         }
323     }
324 
325     private boolean doWriteMessage(Object msg) throws Exception {
326         final ByteBuf data;
327         InetSocketAddress remoteAddress;
328         if (msg instanceof AddressedEnvelope) {
329             @SuppressWarnings("unchecked")
330             AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
331                     (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
332             data = envelope.content();
333             remoteAddress = envelope.recipient();
334         } else {
335             data = (ByteBuf) msg;
336             remoteAddress = null;
337         }
338 
339         final int dataLen = data.readableBytes();
340         if (dataLen == 0) {
341             return true;
342         }
343 
344         final long writtenBytes;
345         if (data.hasMemoryAddress()) {
346             long memoryAddress = data.memoryAddress();
347             if (remoteAddress == null) {
348                 writtenBytes = fd().writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
349             } else {
350                 writtenBytes = fd().sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
351                         remoteAddress.getAddress(), remoteAddress.getPort());
352             }
353         } else if (data.nioBufferCount() > 1) {
354             IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
355             array.add(data);
356             int cnt = array.count();
357             assert cnt != 0;
358 
359             if (remoteAddress == null) {
360                 writtenBytes = fd().writevAddresses(array.memoryAddress(0), cnt);
361             } else {
362                 writtenBytes = fd().sendToAddresses(array.memoryAddress(0), cnt,
363                         remoteAddress.getAddress(), remoteAddress.getPort());
364             }
365         } else  {
366             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
367             if (remoteAddress == null) {
368                 writtenBytes = fd().write(nioData, nioData.position(), nioData.limit());
369             } else {
370                 writtenBytes = fd().sendTo(nioData, nioData.position(), nioData.limit(),
371                         remoteAddress.getAddress(), remoteAddress.getPort());
372             }
373         }
374 
375         return writtenBytes > 0;
376     }
377 
378     @Override
379     protected Object filterOutboundMessage(Object msg) {
380         if (msg instanceof DatagramPacket) {
381             DatagramPacket packet = (DatagramPacket) msg;
382             ByteBuf content = packet.content();
383             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
384                     new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
385         }
386 
387         if (msg instanceof ByteBuf) {
388             ByteBuf buf = (ByteBuf) msg;
389             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
390         }
391 
392         if (msg instanceof AddressedEnvelope) {
393             @SuppressWarnings("unchecked")
394             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
395             if (e.content() instanceof ByteBuf &&
396                 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
397 
398                 ByteBuf content = (ByteBuf) e.content();
399                 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
400                         new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
401                             newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
402             }
403         }
404 
405         throw new UnsupportedOperationException(
406                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
407     }
408 
409     @Override
410     public EpollDatagramChannelConfig config() {
411         return config;
412     }
413 
414     @Override
415     protected void doDisconnect() throws Exception {
416         fd().disconnect();
417         connected = active = false;
418     }
419 
420     @Override
421     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
422         if (super.doConnect(remoteAddress, localAddress)) {
423             connected = true;
424             return true;
425         }
426         return false;
427     }
428 
429     @Override
430     protected void doClose() throws Exception {
431         super.doClose();
432         connected = false;
433     }
434 
435     final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
436         private RecvByteBufAllocator.Handle allocHandle;
437 
438         @Override
439         void epollInReady() {
440             assert eventLoop().inEventLoop();
441             if (fd().isInputShutdown()) {
442                 return;
443             }
444             DatagramChannelConfig config = config();
445             boolean edgeTriggered = isFlagSet(Native.EPOLLET);
446 
447             if (!readPending && !edgeTriggered && !config.isAutoRead()) {
448                 // ChannelConfig.setAutoRead(false) was called in the meantime
449                 clearEpollIn0();
450                 return;
451             }
452 
453             RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
454             if (allocHandle == null) {
455                 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
456             }
457 
458             final ChannelPipeline pipeline = pipeline();
459             Throwable exception = null;
460             try {
461                 // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
462                 final int maxMessagesPerRead = edgeTriggered
463                         ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
464                 int messages = 0;
465                 do {
466                     ByteBuf data = null;
467                     try {
468                         data = allocHandle.allocate(config.getAllocator());
469                         int writerIndex = data.writerIndex();
470                         DatagramSocketAddress remoteAddress;
471                         if (data.hasMemoryAddress()) {
472                             // has a memory address so use optimized call
473                             remoteAddress = fd().recvFromAddress(data.memoryAddress(), data.writerIndex(),
474                                     data.capacity());
475                         } else {
476                             ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
477                             remoteAddress = fd().recvFrom(nioData, nioData.position(), nioData.limit());
478                         }
479 
480                         if (remoteAddress == null) {
481                             break;
482                         }
483 
484                         int readBytes = remoteAddress.receivedAmount();
485                         data.writerIndex(data.writerIndex() + readBytes);
486                         allocHandle.record(readBytes);
487                         readPending = false;
488 
489                         readPending = false;
490                         pipeline.fireChannelRead(
491                                 new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
492 
493                         data = null;
494                     } catch (Throwable t) {
495                         // We do not break from the loop here and remember the last exception,
496                         // because we need to consume everything from the socket used with epoll ET.
497                         exception = t;
498                     } finally {
499                         if (data != null) {
500                             data.release();
501                         }
502                         if (!edgeTriggered && !config.isAutoRead()) {
503                             // This is not using EPOLLET so we can stop reading
504                             // ASAP as we will get notified again later with
505                             // pending data
506                             break;
507                         }
508                     }
509                 } while (++ messages < maxMessagesPerRead || isRdHup());
510 
511                 pipeline.fireChannelReadComplete();
512 
513                 if (exception != null) {
514                     pipeline.fireExceptionCaught(exception);
515                 }
516             } finally {
517                 // Check if there is a readPending which was not processed yet.
518                 // This could be for two reasons:
519                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
520                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
521                 //
522                 // See https://github.com/netty/netty/issues/2254
523                 if (!readPending && !config.isAutoRead()) {
524                     clearEpollIn();
525                 }
526             }
527         }
528     }
529 }