View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelOutboundBuffer;
20  import io.netty.channel.socket.DatagramPacket;
21  import io.netty.util.concurrent.FastThreadLocal;
22  
23  import java.net.Inet6Address;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  
27  /**
28   * Support <a href="http://linux.die.net/man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+
29   */
30  final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor {
31  
32      private static final FastThreadLocal<NativeDatagramPacketArray> ARRAY =
33              new FastThreadLocal<NativeDatagramPacketArray>() {
34                  @Override
35                  protected NativeDatagramPacketArray initialValue() throws Exception {
36                      return new NativeDatagramPacketArray();
37                  }
38  
39                  @Override
40                  protected void onRemoval(NativeDatagramPacketArray value) throws Exception {
41                      NativeDatagramPacket[] array = value.packets;
42                      // Release all packets
43                      for (int i = 0; i < array.length; i++) {
44                          array[i].release();
45                      }
46                  }
47              };
48  
49      // Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
50      private final NativeDatagramPacket[] packets = new NativeDatagramPacket[Native.UIO_MAX_IOV];
51      private int count;
52  
53      private NativeDatagramPacketArray() {
54          for (int i = 0; i < packets.length; i++) {
55              packets[i] = new NativeDatagramPacket();
56          }
57      }
58  
59      /**
60       * Try to add the given {@link DatagramPacket}. Returns {@code true} on success,
61       * {@code false} otherwise.
62       */
63      boolean add(DatagramPacket packet) {
64          if (count == packets.length) {
65              return false;
66          }
67          ByteBuf content = packet.content();
68          int len = content.readableBytes();
69          if (len == 0) {
70              return true;
71          }
72          NativeDatagramPacket p = packets[count];
73          InetSocketAddress recipient = packet.recipient();
74          if (!p.init(content, recipient)) {
75              return false;
76          }
77  
78          count++;
79          return true;
80      }
81  
82      @Override
83      public boolean processMessage(Object msg) throws Exception {
84          return msg instanceof DatagramPacket && add((DatagramPacket) msg);
85      }
86  
87      /**
88       * Returns the count
89       */
90      int count() {
91          return count;
92      }
93  
94      /**
95       * Returns an array with {@link #count()} {@link NativeDatagramPacket}s filled.
96       */
97      NativeDatagramPacket[] packets() {
98          return packets;
99      }
100 
101     /**
102      * Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of
103      * {@link ChannelOutboundBuffer}.
104      */
105     static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception {
106         NativeDatagramPacketArray array = ARRAY.get();
107         array.count = 0;
108         buffer.forEachFlushedMessage(array);
109         return array;
110     }
111 
112     /**
113      * Used to pass needed data to JNI.
114      */
115     @SuppressWarnings("unused")
116     static final class NativeDatagramPacket {
117         // Each NativeDatagramPackets holds a IovArray which is used for gathering writes.
118         // This is ok as NativeDatagramPacketArray is always obtained via a FastThreadLocal and
119         // so the memory needed is quite small anyway.
120         private final IovArray array = new IovArray();
121 
122         // This is the actual struct iovec*
123         private long memoryAddress;
124         private int count;
125 
126         private byte[] addr;
127         private int scopeId;
128         private int port;
129 
130         private void release() {
131             array.release();
132         }
133 
134         /**
135          * Init this instance and return {@code true} if the init was successful.
136          */
137         private boolean init(ByteBuf buf, InetSocketAddress recipient) {
138             array.clear();
139             if (!array.add(buf)) {
140                 return false;
141             }
142             // always start from offset 0
143             memoryAddress = array.memoryAddress(0);
144             count = array.count();
145 
146             InetAddress address = recipient.getAddress();
147             if (address instanceof Inet6Address) {
148                 addr = address.getAddress();
149                 scopeId = ((Inet6Address) address).getScopeId();
150             } else {
151                 addr = Native.ipv4MappedIpv6Address(address.getAddress());
152                 scopeId = 0;
153             }
154             port = recipient.getPort();
155             return true;
156         }
157     }
158 }