View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelOutboundBuffer;
20  import io.netty.channel.socket.DatagramPacket;
21  import io.netty.channel.unix.IovArray;
22  import io.netty.util.concurrent.FastThreadLocal;
23  import java.net.Inet6Address;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  
27  import static io.netty.channel.unix.Limits.UIO_MAX_IOV;
28  import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address;
29  
30  /**
31   * Support <a href="http://linux.die.net/man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+
32   */
33  final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor {
34  
35      private static final FastThreadLocal<NativeDatagramPacketArray> ARRAY =
36              new FastThreadLocal<NativeDatagramPacketArray>() {
37                  @Override
38                  protected NativeDatagramPacketArray initialValue() throws Exception {
39                      return new NativeDatagramPacketArray();
40                  }
41  
42                  @Override
43                  protected void onRemoval(NativeDatagramPacketArray value) throws Exception {
44                      NativeDatagramPacket[] packetsArray = value.packets;
45                      // Release all packets
46                      for (NativeDatagramPacket datagramPacket : packetsArray) {
47                          datagramPacket.release();
48                      }
49                  }
50              };
51  
52      // Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
53      private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV];
54      private int count;
55  
56      private NativeDatagramPacketArray() {
57          for (int i = 0; i < packets.length; i++) {
58              packets[i] = new NativeDatagramPacket();
59          }
60      }
61  
62      /**
63       * Try to add the given {@link DatagramPacket}. Returns {@code true} on success,
64       * {@code false} otherwise.
65       */
66      boolean add(DatagramPacket packet) {
67          if (count == packets.length) {
68              return false;
69          }
70          ByteBuf content = packet.content();
71          int len = content.readableBytes();
72          if (len == 0) {
73              return true;
74          }
75          NativeDatagramPacket p = packets[count];
76          InetSocketAddress recipient = packet.recipient();
77          if (!p.init(content, recipient)) {
78              return false;
79          }
80  
81          count++;
82          return true;
83      }
84  
85      @Override
86      public boolean processMessage(Object msg) throws Exception {
87          return msg instanceof DatagramPacket && add((DatagramPacket) msg);
88      }
89  
90      /**
91       * Returns the count
92       */
93      int count() {
94          return count;
95      }
96  
97      /**
98       * Returns an array with {@link #count()} {@link NativeDatagramPacket}s filled.
99       */
100     NativeDatagramPacket[] packets() {
101         return packets;
102     }
103 
104     /**
105      * Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of
106      * {@link ChannelOutboundBuffer}.
107      */
108     static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception {
109         NativeDatagramPacketArray array = ARRAY.get();
110         array.count = 0;
111         buffer.forEachFlushedMessage(array);
112         return array;
113     }
114 
115     /**
116      * Used to pass needed data to JNI.
117      */
118     @SuppressWarnings("unused")
119     static final class NativeDatagramPacket {
120         // Each NativeDatagramPackets holds a IovArray which is used for gathering writes.
121         // This is ok as NativeDatagramPacketArray is always obtained via a FastThreadLocal and
122         // so the memory needed is quite small anyway.
123         private final IovArray array = new IovArray();
124 
125         // This is the actual struct iovec*
126         private long memoryAddress;
127         private int count;
128 
129         private byte[] addr;
130         private int scopeId;
131         private int port;
132 
133         private void release() {
134             array.release();
135         }
136 
137         /**
138          * Init this instance and return {@code true} if the init was successful.
139          */
140         private boolean init(ByteBuf buf, InetSocketAddress recipient) {
141             array.clear();
142             if (!array.add(buf)) {
143                 return false;
144             }
145             // always start from offset 0
146             memoryAddress = array.memoryAddress(0);
147             count = array.count();
148 
149             InetAddress address = recipient.getAddress();
150             if (address instanceof Inet6Address) {
151                 addr = address.getAddress();
152                 scopeId = ((Inet6Address) address).getScopeId();
153             } else {
154                 addr = ipv4MappedIpv6Address(address.getAddress());
155                 scopeId = 0;
156             }
157             port = recipient.getPort();
158             return true;
159         }
160     }
161 }