View Javadoc

1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address;
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.socket.DatagramPacket;
22  import io.netty.util.concurrent.FastThreadLocal;
23  
24  import java.net.Inet6Address;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  
28  /**
29   * Support <a href="http://linux.die.net/man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+
30   */
31  final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor {
32  
33      private static final FastThreadLocal<NativeDatagramPacketArray> ARRAY =
34              new FastThreadLocal<NativeDatagramPacketArray>() {
35                  @Override
36                  protected NativeDatagramPacketArray initialValue() throws Exception {
37                      return new NativeDatagramPacketArray();
38                  }
39  
40                  @Override
41                  protected void onRemoval(NativeDatagramPacketArray value) throws Exception {
42                      NativeDatagramPacket[] packetsArray = value.packets;
43                      // Release all packets
44                      for (NativeDatagramPacket datagramPacket : packetsArray) {
45                          datagramPacket.release();
46                      }
47                  }
48              };
49  
50      // Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
51      private final NativeDatagramPacket[] packets = new NativeDatagramPacket[Native.UIO_MAX_IOV];
52      private int count;
53  
54      private NativeDatagramPacketArray() {
55          for (int i = 0; i < packets.length; i++) {
56              packets[i] = new NativeDatagramPacket();
57          }
58      }
59  
60      /**
61       * Try to add the given {@link DatagramPacket}. Returns {@code true} on success,
62       * {@code false} otherwise.
63       */
64      boolean add(DatagramPacket packet) {
65          if (count == packets.length) {
66              return false;
67          }
68          ByteBuf content = packet.content();
69          int len = content.readableBytes();
70          if (len == 0) {
71              return true;
72          }
73          NativeDatagramPacket p = packets[count];
74          InetSocketAddress recipient = packet.recipient();
75          if (!p.init(content, recipient)) {
76              return false;
77          }
78  
79          count++;
80          return true;
81      }
82  
83      @Override
84      public boolean processMessage(Object msg) throws Exception {
85          return msg instanceof DatagramPacket && add((DatagramPacket) msg);
86      }
87  
88      /**
89       * Returns the count
90       */
91      int count() {
92          return count;
93      }
94  
95      /**
96       * Returns an array with {@link #count()} {@link NativeDatagramPacket}s filled.
97       */
98      NativeDatagramPacket[] packets() {
99          return packets;
100     }
101 
102     /**
103      * Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of
104      * {@link ChannelOutboundBuffer}.
105      */
106     static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception {
107         NativeDatagramPacketArray array = ARRAY.get();
108         array.count = 0;
109         buffer.forEachFlushedMessage(array);
110         return array;
111     }
112 
113     /**
114      * Used to pass needed data to JNI.
115      */
116     @SuppressWarnings("unused")
117     static final class NativeDatagramPacket {
118         // Each NativeDatagramPackets holds a IovArray which is used for gathering writes.
119         // This is ok as NativeDatagramPacketArray is always obtained via a FastThreadLocal and
120         // so the memory needed is quite small anyway.
121         private final IovArray array = new IovArray();
122 
123         // This is the actual struct iovec*
124         private long memoryAddress;
125         private int count;
126 
127         private byte[] addr;
128         private int scopeId;
129         private int port;
130 
131         private void release() {
132             array.release();
133         }
134 
135         /**
136          * Init this instance and return {@code true} if the init was successful.
137          */
138         private boolean init(ByteBuf buf, InetSocketAddress recipient) {
139             array.clear();
140             if (!array.add(buf)) {
141                 return false;
142             }
143             // always start from offset 0
144             memoryAddress = array.memoryAddress(0);
145             count = array.count();
146 
147             InetAddress address = recipient.getAddress();
148             if (address instanceof Inet6Address) {
149                 addr = address.getAddress();
150                 scopeId = ((Inet6Address) address).getScopeId();
151             } else {
152                 addr = ipv4MappedIpv6Address(address.getAddress());
153                 scopeId = 0;
154             }
155             port = recipient.getPort();
156             return true;
157         }
158     }
159 }