View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.CompositeChannelBuffer;
20  import org.jboss.netty.channel.DefaultFileRegion;
21  import org.jboss.netty.channel.FileRegion;
22  import org.jboss.netty.util.ExternalResourceReleasable;
23  import org.jboss.netty.util.internal.ByteBufferUtil;
24  
25  import java.io.IOException;
26  import java.lang.ref.SoftReference;
27  import java.net.SocketAddress;
28  import java.nio.ByteBuffer;
29  import java.nio.channels.DatagramChannel;
30  import java.nio.channels.GatheringByteChannel;
31  import java.nio.channels.WritableByteChannel;
32  
33  final class SocketSendBufferPool implements ExternalResourceReleasable {
34  
35      private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
36  
37      private static final int DEFAULT_PREALLOCATION_SIZE = 65536;
38      private static final int ALIGN_SHIFT = 4;
39      private static final int ALIGN_MASK = 15;
40  
41      PreallocationRef poolHead;
42      Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
43  
44      SendBuffer acquire(Object message) {
45          if (message instanceof ChannelBuffer) {
46              return acquire((ChannelBuffer) message);
47          } else if (message instanceof FileRegion) {
48              return acquire((FileRegion) message);
49          }
50  
51          throw new IllegalArgumentException(
52                  "unsupported message type: " + message.getClass());
53      }
54  
55      private SendBuffer acquire(FileRegion src) {
56          if (src.getCount() == 0) {
57              return EMPTY_BUFFER;
58          }
59          return new FileSendBuffer(src);
60      }
61  
62      private SendBuffer acquire(ChannelBuffer src) {
63          final int size = src.readableBytes();
64          if (size == 0) {
65              return EMPTY_BUFFER;
66          }
67  
68  
69          if (src instanceof CompositeChannelBuffer && ((CompositeChannelBuffer) src).useGathering()) {
70              return new GatheringSendBuffer(src.toByteBuffers());
71          }
72  
73          if (src.isDirect()) {
74              return new UnpooledSendBuffer(src.toByteBuffer());
75          }
76          if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
77              return new UnpooledSendBuffer(src.toByteBuffer());
78          }
79  
80          Preallocation current = this.current;
81          ByteBuffer buffer = current.buffer;
82          int remaining = buffer.remaining();
83          PooledSendBuffer dst;
84  
85          if (size < remaining) {
86              int nextPos = buffer.position() + size;
87              ByteBuffer slice = buffer.duplicate();
88              buffer.position(align(nextPos));
89              slice.limit(nextPos);
90              current.refCnt ++;
91              dst = new PooledSendBuffer(current, slice);
92          } else if (size > remaining) {
93              this.current = current = getPreallocation();
94              buffer = current.buffer;
95              ByteBuffer slice = buffer.duplicate();
96              buffer.position(align(size));
97              slice.limit(size);
98              current.refCnt ++;
99              dst = new PooledSendBuffer(current, slice);
100         } else { // size == remaining
101             current.refCnt ++;
102             this.current = getPreallocation0();
103             dst = new PooledSendBuffer(current, current.buffer);
104         }
105 
106         ByteBuffer dstbuf = dst.buffer;
107         dstbuf.mark();
108         src.getBytes(src.readerIndex(), dstbuf);
109         dstbuf.reset();
110         return dst;
111     }
112 
113     private Preallocation getPreallocation() {
114         Preallocation current = this.current;
115         if (current.refCnt == 0) {
116             current.buffer.clear();
117             return current;
118         }
119 
120         return getPreallocation0();
121     }
122 
123     private Preallocation getPreallocation0() {
124         PreallocationRef ref = poolHead;
125         if (ref != null) {
126             do {
127                 Preallocation p = ref.get();
128                 ref = ref.next;
129 
130                 if (p != null) {
131                     poolHead = ref;
132                     return p;
133                 }
134             } while (ref != null);
135 
136             poolHead = ref;
137         }
138 
139         return new Preallocation(DEFAULT_PREALLOCATION_SIZE);
140     }
141 
142     private static int align(int pos) {
143         int q = pos >>> ALIGN_SHIFT;
144         int r = pos & ALIGN_MASK;
145         if (r != 0) {
146             q ++;
147         }
148         return q << ALIGN_SHIFT;
149     }
150 
151     private static final class Preallocation {
152         final ByteBuffer buffer;
153         int refCnt;
154 
155         Preallocation(int capacity) {
156             buffer = ByteBuffer.allocateDirect(capacity);
157         }
158     }
159 
160     private final class PreallocationRef extends SoftReference<Preallocation> {
161         final PreallocationRef next;
162 
163         PreallocationRef(Preallocation prealloation, PreallocationRef next) {
164             super(prealloation);
165             this.next = next;
166         }
167     }
168 
169     interface SendBuffer {
170         boolean finished();
171         long writtenBytes();
172         long totalBytes();
173 
174         long transferTo(WritableByteChannel ch) throws IOException;
175         long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
176 
177         void release();
178     }
179 
180     static class UnpooledSendBuffer implements SendBuffer {
181 
182         final ByteBuffer buffer;
183         final int initialPos;
184 
185         UnpooledSendBuffer(ByteBuffer buffer) {
186             this.buffer = buffer;
187             initialPos = buffer.position();
188         }
189 
190         public final boolean finished() {
191             return !buffer.hasRemaining();
192         }
193 
194         public final long writtenBytes() {
195             return buffer.position() - initialPos;
196         }
197 
198         public final long totalBytes() {
199             return buffer.limit() - initialPos;
200         }
201 
202         public final long transferTo(WritableByteChannel ch) throws IOException {
203             return ch.write(buffer);
204         }
205 
206         public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
207             return ch.send(buffer, raddr);
208         }
209 
210         public void release() {
211             // Unpooled.
212         }
213     }
214 
215     final class PooledSendBuffer extends UnpooledSendBuffer {
216 
217         private final Preallocation parent;
218 
219         PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
220             super(buffer);
221             this.parent = parent;
222         }
223 
224         @Override
225         public void release() {
226             final Preallocation parent = this.parent;
227             if (-- parent.refCnt == 0) {
228                 parent.buffer.clear();
229                 if (parent != current) {
230                     poolHead = new PreallocationRef(parent, poolHead);
231                 }
232             }
233         }
234     }
235 
236     static class GatheringSendBuffer implements SendBuffer {
237 
238         private final ByteBuffer[] buffers;
239         private final int last;
240         private long written;
241         private final int total;
242 
243         GatheringSendBuffer(ByteBuffer[] buffers) {
244             this.buffers = buffers;
245             last = buffers.length - 1;
246             int total = 0;
247             for (ByteBuffer buf: buffers) {
248                 total += buf.remaining();
249             }
250             this.total = total;
251         }
252 
253         public boolean finished() {
254             return !buffers[last].hasRemaining();
255         }
256 
257         public long writtenBytes() {
258             return written;
259         }
260 
261         public long totalBytes() {
262             return total;
263         }
264 
265         public long transferTo(WritableByteChannel ch) throws IOException {
266             if (ch instanceof GatheringByteChannel) {
267                  long w = ((GatheringByteChannel) ch).write(buffers);
268                  written += w;
269                  return w;
270             } else {
271                 int send = 0;
272                 for (ByteBuffer buf: buffers) {
273                     if (buf.hasRemaining()) {
274                         int w = ch.write(buf);
275                         if (w == 0) {
276                             break;
277                         } else {
278                             send += w;
279                         }
280                     }
281                 }
282                 written += send;
283                 return send;
284             }
285         }
286 
287         public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
288             int send = 0;
289             for (ByteBuffer buf: buffers) {
290                 if (buf.hasRemaining()) {
291                     int w = ch.send(buf, raddr);
292                     if (w == 0) {
293                         break;
294                     } else {
295                         send += w;
296                     }
297                 }
298             }
299             written += send;
300 
301             return send;
302         }
303 
304         public void release() {
305             // nothing todo
306         }
307 
308     }
309 
310     final class FileSendBuffer implements SendBuffer {
311 
312         private final FileRegion file;
313         private long writtenBytes;
314 
315 
316         FileSendBuffer(FileRegion file) {
317             this.file = file;
318         }
319 
320         public boolean finished() {
321             return writtenBytes >= file.getCount();
322         }
323 
324         public long writtenBytes() {
325             return writtenBytes;
326         }
327 
328         public long totalBytes() {
329             return file.getCount();
330         }
331 
332         public long transferTo(WritableByteChannel ch) throws IOException {
333             long localWrittenBytes = file.transferTo(ch, writtenBytes);
334             writtenBytes += localWrittenBytes;
335             return localWrittenBytes;
336         }
337 
338         public long transferTo(DatagramChannel ch, SocketAddress raddr) {
339             throw new UnsupportedOperationException();
340         }
341 
342         public void release() {
343             if (file instanceof DefaultFileRegion) {
344                 if (((DefaultFileRegion) file).releaseAfterTransfer()) {
345                     // Make sure the FileRegion resource are released otherwise it may cause a FD
346                     // leak or something similar
347                     file.releaseExternalResources();
348                 }
349             }
350         }
351     }
352 
353     static final class EmptySendBuffer implements SendBuffer {
354 
355         public boolean finished() {
356             return true;
357         }
358 
359         public long writtenBytes() {
360             return 0;
361         }
362 
363         public long totalBytes() {
364             return 0;
365         }
366 
367         public long transferTo(WritableByteChannel ch) {
368             return 0;
369         }
370 
371         public long transferTo(DatagramChannel ch, SocketAddress raddr) {
372             return 0;
373         }
374 
375         public void release() {
376             // Unpooled.
377         }
378     }
379 
380     public void releaseExternalResources() {
381         if (current.buffer != null) {
382             ByteBufferUtil.destroy(current.buffer);
383         }
384     }
385 }