View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.CompositeChannelBuffer;
20  import org.jboss.netty.channel.DefaultFileRegion;
21  import org.jboss.netty.channel.FileRegion;
22  import org.jboss.netty.util.ExternalResourceReleasable;
23  import org.jboss.netty.util.internal.ByteBufferUtil;
24  
25  import java.io.IOException;
26  import java.lang.ref.SoftReference;
27  import java.net.SocketAddress;
28  import java.nio.ByteBuffer;
29  import java.nio.channels.DatagramChannel;
30  import java.nio.channels.GatheringByteChannel;
31  import java.nio.channels.WritableByteChannel;
32  
33  final class SocketSendBufferPool implements ExternalResourceReleasable {
34  
35      private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
36  
37      private static final int DEFAULT_PREALLOCATION_SIZE = 65536;
38      private static final int ALIGN_SHIFT = 4;
39      private static final int ALIGN_MASK = 15;
40  
41      private PreallocationRef poolHead;
42      private Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
43  
44      SendBuffer acquire(Object message) {
45          if (message instanceof ChannelBuffer) {
46              return acquire((ChannelBuffer) message);
47          }
48          if (message instanceof FileRegion) {
49              return acquire((FileRegion) message);
50          }
51  
52          throw new IllegalArgumentException(
53                  "unsupported message type: " + message.getClass());
54      }
55  
56      private SendBuffer acquire(FileRegion src) {
57          if (src.getCount() == 0) {
58              return EMPTY_BUFFER;
59          }
60          return new FileSendBuffer(src);
61      }
62  
63      private SendBuffer acquire(ChannelBuffer src) {
64          final int size = src.readableBytes();
65          if (size == 0) {
66              return EMPTY_BUFFER;
67          }
68  
69          if (src instanceof CompositeChannelBuffer && ((CompositeChannelBuffer) src).useGathering()) {
70              return new GatheringSendBuffer(src.toByteBuffers());
71          }
72  
73          if (src.isDirect()) {
74              return new UnpooledSendBuffer(src.toByteBuffer());
75          }
76          if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
77              return new UnpooledSendBuffer(src.toByteBuffer());
78          }
79  
80          Preallocation current = this.current;
81          ByteBuffer buffer = current.buffer;
82          int remaining = buffer.remaining();
83          PooledSendBuffer dst;
84  
85          if (size < remaining) {
86              int nextPos = buffer.position() + size;
87              ByteBuffer slice = buffer.duplicate();
88              buffer.position(align(nextPos));
89              slice.limit(nextPos);
90              current.refCnt ++;
91              dst = new PooledSendBuffer(current, slice);
92          } else if (size > remaining) {
93              this.current = current = getPreallocation();
94              buffer = current.buffer;
95              ByteBuffer slice = buffer.duplicate();
96              buffer.position(align(size));
97              slice.limit(size);
98              current.refCnt ++;
99              dst = new PooledSendBuffer(current, slice);
100         } else { // size == remaining
101             current.refCnt ++;
102             this.current = getPreallocation0();
103             dst = new PooledSendBuffer(current, current.buffer);
104         }
105 
106         ByteBuffer dstbuf = dst.buffer;
107         dstbuf.mark();
108         src.getBytes(src.readerIndex(), dstbuf);
109         dstbuf.reset();
110         return dst;
111     }
112 
113     private Preallocation getPreallocation() {
114         Preallocation current = this.current;
115         if (current.refCnt == 0) {
116             current.buffer.clear();
117             return current;
118         }
119 
120         return getPreallocation0();
121     }
122 
123     private Preallocation getPreallocation0() {
124         PreallocationRef ref = poolHead;
125         if (ref != null) {
126             do {
127                 Preallocation p = ref.get();
128                 ref = ref.next;
129 
130                 if (p != null) {
131                     poolHead = ref;
132                     return p;
133                 }
134             } while (ref != null);
135 
136             poolHead = ref;
137         }
138 
139         return new Preallocation(DEFAULT_PREALLOCATION_SIZE);
140     }
141 
142     private static int align(int pos) {
143         int q = pos >>> ALIGN_SHIFT;
144         int r = pos & ALIGN_MASK;
145         if (r != 0) {
146             q ++;
147         }
148         return q << ALIGN_SHIFT;
149     }
150 
151     private static final class Preallocation {
152         final ByteBuffer buffer;
153         int refCnt;
154 
155         Preallocation(int capacity) {
156             buffer = ByteBuffer.allocateDirect(capacity);
157         }
158     }
159 
160     private final class PreallocationRef extends SoftReference<Preallocation> {
161         final PreallocationRef next;
162 
163         PreallocationRef(Preallocation prealloation, PreallocationRef next) {
164             super(prealloation);
165             this.next = next;
166         }
167     }
168 
169     interface SendBuffer {
170         boolean finished();
171         long writtenBytes();
172         long totalBytes();
173 
174         long transferTo(WritableByteChannel ch) throws IOException;
175         long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
176 
177         void release();
178     }
179 
180     static class UnpooledSendBuffer implements SendBuffer {
181 
182         final ByteBuffer buffer;
183         final int initialPos;
184 
185         UnpooledSendBuffer(ByteBuffer buffer) {
186             this.buffer = buffer;
187             initialPos = buffer.position();
188         }
189 
190         public final boolean finished() {
191             return !buffer.hasRemaining();
192         }
193 
194         public final long writtenBytes() {
195             return buffer.position() - initialPos;
196         }
197 
198         public final long totalBytes() {
199             return buffer.limit() - initialPos;
200         }
201 
202         public final long transferTo(WritableByteChannel ch) throws IOException {
203             return ch.write(buffer);
204         }
205 
206         public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
207             return ch.send(buffer, raddr);
208         }
209 
210         public void release() {
211             // Unpooled.
212         }
213     }
214 
215     final class PooledSendBuffer extends UnpooledSendBuffer {
216 
217         private final Preallocation parent;
218 
219         PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
220             super(buffer);
221             this.parent = parent;
222         }
223 
224         @Override
225         public void release() {
226             final Preallocation parent = this.parent;
227             if (-- parent.refCnt == 0) {
228                 parent.buffer.clear();
229                 if (parent != current) {
230                     poolHead = new PreallocationRef(parent, poolHead);
231                 }
232             }
233         }
234     }
235 
236     static class GatheringSendBuffer implements SendBuffer {
237 
238         private final ByteBuffer[] buffers;
239         private final int last;
240         private long written;
241         private final int total;
242 
243         GatheringSendBuffer(ByteBuffer[] buffers) {
244             this.buffers = buffers;
245             last = buffers.length - 1;
246             int total = 0;
247             for (ByteBuffer buf: buffers) {
248                 total += buf.remaining();
249             }
250             this.total = total;
251         }
252 
253         public boolean finished() {
254             return !buffers[last].hasRemaining();
255         }
256 
257         public long writtenBytes() {
258             return written;
259         }
260 
261         public long totalBytes() {
262             return total;
263         }
264 
265         public long transferTo(WritableByteChannel ch) throws IOException {
266             if (ch instanceof GatheringByteChannel) {
267                  long w = ((GatheringByteChannel) ch).write(buffers);
268                  written += w;
269                  return w;
270             } else {
271                 int send = 0;
272                 for (ByteBuffer buf: buffers) {
273                     if (buf.hasRemaining()) {
274                         int w = ch.write(buf);
275                         if (w == 0) {
276                             break;
277                         } else {
278                             send += w;
279                         }
280                     }
281                 }
282                 written += send;
283                 return send;
284             }
285         }
286 
287         public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
288             int send = 0;
289             for (ByteBuffer buf: buffers) {
290                 if (buf.hasRemaining()) {
291                     int w = ch.send(buf, raddr);
292                     if (w == 0) {
293                         break;
294                     } else {
295                         send += w;
296                     }
297                 }
298             }
299             written += send;
300 
301             return send;
302         }
303 
304         public void release() {
305             // nothing todo
306         }
307     }
308 
309     final class FileSendBuffer implements SendBuffer {
310 
311         private final FileRegion file;
312         private long writtenBytes;
313 
314         FileSendBuffer(FileRegion file) {
315             this.file = file;
316         }
317 
318         public boolean finished() {
319             return writtenBytes >= file.getCount();
320         }
321 
322         public long writtenBytes() {
323             return writtenBytes;
324         }
325 
326         public long totalBytes() {
327             return file.getCount();
328         }
329 
330         public long transferTo(WritableByteChannel ch) throws IOException {
331             long localWrittenBytes = file.transferTo(ch, writtenBytes);
332             writtenBytes += localWrittenBytes;
333             return localWrittenBytes;
334         }
335 
336         public long transferTo(DatagramChannel ch, SocketAddress raddr) {
337             throw new UnsupportedOperationException();
338         }
339 
340         public void release() {
341             if (file instanceof DefaultFileRegion) {
342                 if (((DefaultFileRegion) file).releaseAfterTransfer()) {
343                     // Make sure the FileRegion resource are released otherwise it may cause a FD
344                     // leak or something similar
345                     file.releaseExternalResources();
346                 }
347             }
348         }
349     }
350 
351     static final class EmptySendBuffer implements SendBuffer {
352 
353         public boolean finished() {
354             return true;
355         }
356 
357         public long writtenBytes() {
358             return 0;
359         }
360 
361         public long totalBytes() {
362             return 0;
363         }
364 
365         public long transferTo(WritableByteChannel ch) {
366             return 0;
367         }
368 
369         public long transferTo(DatagramChannel ch, SocketAddress raddr) {
370             return 0;
371         }
372 
373         public void release() {
374             // Unpooled.
375         }
376     }
377 
378     public void releaseExternalResources() {
379         if (current.buffer != null) {
380             ByteBufferUtil.destroy(current.buffer);
381         }
382     }
383 }