View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelOutboundBuffer;
20  import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
21  import io.netty.channel.socket.DatagramPacket;
22  import io.netty.channel.unix.IovArray;
23  import io.netty.channel.unix.Limits;
24  import io.netty.channel.unix.SegmentedDatagramPacket;
25  import io.netty.util.internal.UnstableApi;
26  
27  import java.net.Inet6Address;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.UnknownHostException;
31  
32  import static io.netty.channel.unix.Limits.UIO_MAX_IOV;
33  import static io.netty.channel.unix.NativeInetAddress.copyIpv4MappedIpv6Address;
34  
35  /**
36   * Support <a href="https://linux.die.net//man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+
37   */
38  final class NativeDatagramPacketArray {
39  
40      // Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
41      private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV];
42  
43      // We share one IovArray for all NativeDatagramPackets to reduce memory overhead. This will allow us to write
44      // up to IOV_MAX iovec across all messages in one sendmmsg(...) call.
45      private final IovArray iovArray = new IovArray();
46  
47      // temporary array to copy the ipv4 part of ipv6-mapped-ipv4 addresses and then create a Inet4Address out of it.
48      private final byte[] ipv4Bytes = new byte[4];
49      private final MyMessageProcessor processor = new MyMessageProcessor();
50  
51      private int count;
52  
53      NativeDatagramPacketArray() {
54          for (int i = 0; i < packets.length; i++) {
55              packets[i] = new NativeDatagramPacket();
56          }
57      }
58  
59      boolean addWritable(ByteBuf buf, int index, int len) {
60          return add0(buf, index, len, 0, null);
61      }
62  
63      private boolean add0(ByteBuf buf, int index, int len, int segmentLen, InetSocketAddress recipient) {
64          if (count == packets.length) {
65              // We already filled up to UIO_MAX_IOV messages. This is the max allowed per
66              // recvmmsg(...) / sendmmsg(...) call, we will try again later.
67              return false;
68          }
69          if (len == 0) {
70              return true;
71          }
72          int offset = iovArray.count();
73          if (offset == Limits.IOV_MAX || !iovArray.add(buf, index, len)) {
74              // Not enough space to hold the whole content, we will try again later.
75              return false;
76          }
77          NativeDatagramPacket p = packets[count];
78          p.init(iovArray.memoryAddress(offset), iovArray.count() - offset, segmentLen, recipient);
79  
80          count++;
81          return true;
82      }
83  
84      void add(ChannelOutboundBuffer buffer, boolean connected, int maxMessagesPerWrite) throws Exception {
85          processor.connected = connected;
86          processor.maxMessagesPerWrite = maxMessagesPerWrite;
87          buffer.forEachFlushedMessage(processor);
88      }
89  
90      /**
91       * Returns the count
92       */
93      int count() {
94          return count;
95      }
96  
97      /**
98       * Returns an array with {@link #count()} {@link NativeDatagramPacket}s filled.
99       */
100     NativeDatagramPacket[] packets() {
101         return packets;
102     }
103 
104     void clear() {
105         this.count = 0;
106         this.iovArray.clear();
107     }
108 
109     void release() {
110         iovArray.release();
111     }
112 
113     private final class MyMessageProcessor implements MessageProcessor {
114         private boolean connected;
115         private int maxMessagesPerWrite;
116 
117         @Override
118         public boolean processMessage(Object msg) {
119             final boolean added;
120             if (msg instanceof DatagramPacket) {
121                 DatagramPacket packet = (DatagramPacket) msg;
122                 ByteBuf buf = packet.content();
123                 int segmentSize = 0;
124                 if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
125                     int seg = ((io.netty.channel.unix.SegmentedDatagramPacket) packet).segmentSize();
126                     // We only need to tell the kernel that we want to use UDP_SEGMENT if there are multiple
127                     // segments in the packet.
128                     if (buf.readableBytes() > seg) {
129                         segmentSize = seg;
130                     }
131                 }
132                 added = add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient());
133             } else if (msg instanceof ByteBuf && connected) {
134                 ByteBuf buf = (ByteBuf) msg;
135                 added = add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null);
136             } else {
137                 added = false;
138             }
139             if (added) {
140                 maxMessagesPerWrite--;
141                 return maxMessagesPerWrite > 0;
142             }
143             return false;
144         }
145     }
146 
147     private static InetSocketAddress newAddress(byte[] addr, int addrLen, int port, int scopeId, byte[] ipv4Bytes)
148             throws UnknownHostException {
149         final InetAddress address;
150         if (addrLen == ipv4Bytes.length) {
151             System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen);
152             address = InetAddress.getByAddress(ipv4Bytes);
153         } else {
154             address = Inet6Address.getByAddress(null, addr, scopeId);
155         }
156         return new InetSocketAddress(address, port);
157     }
158 
159     /**
160      * Used to pass needed data to JNI.
161      */
162     @SuppressWarnings("unused")
163     @UnstableApi
164     public final class NativeDatagramPacket {
165 
166         // IMPORTANT: Most of the below variables are accessed via JNI. Be aware if you change any of these you also
167         // need to change these in the related .c file!
168 
169         // This is the actual struct iovec*
170         private long memoryAddress;
171         private int count;
172 
173         private final byte[] senderAddr = new byte[16];
174         private int senderAddrLen;
175         private int senderScopeId;
176         private int senderPort;
177 
178         private final byte[] recipientAddr = new byte[16];
179         private int recipientAddrLen;
180         private int recipientScopeId;
181         private int recipientPort;
182 
183         private int segmentSize;
184 
185         private void init(long memoryAddress, int count, int segmentSize, InetSocketAddress recipient) {
186             this.memoryAddress = memoryAddress;
187             this.count = count;
188             this.segmentSize = segmentSize;
189 
190             this.senderScopeId = 0;
191             this.senderPort = 0;
192             this.senderAddrLen = 0;
193 
194             if (recipient == null) {
195                 this.recipientScopeId = 0;
196                 this.recipientPort = 0;
197                 this.recipientAddrLen = 0;
198             } else {
199                 InetAddress address = recipient.getAddress();
200                 if (address instanceof Inet6Address) {
201                     System.arraycopy(address.getAddress(), 0, recipientAddr, 0, recipientAddr.length);
202                     recipientScopeId = ((Inet6Address) address).getScopeId();
203                 } else {
204                     copyIpv4MappedIpv6Address(address.getAddress(), recipientAddr);
205                     recipientScopeId = 0;
206                 }
207                 recipientAddrLen = recipientAddr.length;
208                 recipientPort = recipient.getPort();
209             }
210         }
211 
212         boolean hasSender() {
213             return senderPort > 0;
214         }
215 
216         DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress recipient) throws UnknownHostException {
217             InetSocketAddress sender = newAddress(senderAddr, senderAddrLen, senderPort, senderScopeId, ipv4Bytes);
218             if (recipientAddrLen != 0) {
219                 recipient = newAddress(recipientAddr, recipientAddrLen, recipientPort, recipientScopeId, ipv4Bytes);
220             }
221 
222             // Slice out the buffer with the correct length.
223             ByteBuf slice = buffer.retainedSlice(buffer.readerIndex(), count);
224 
225             // UDP_GRO
226             if (segmentSize > 0) {
227                 return new SegmentedDatagramPacket(slice, segmentSize, recipient, sender);
228             }
229             return new DatagramPacket(slice, recipient, sender);
230         }
231     }
232 }