1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.CompositeChannelBuffer;
20 import org.jboss.netty.channel.DefaultFileRegion;
21 import org.jboss.netty.channel.FileRegion;
22 import org.jboss.netty.util.ExternalResourceReleasable;
23 import org.jboss.netty.util.internal.ByteBufferUtil;
24
25 import java.io.IOException;
26 import java.lang.ref.SoftReference;
27 import java.net.SocketAddress;
28 import java.nio.ByteBuffer;
29 import java.nio.channels.DatagramChannel;
30 import java.nio.channels.GatheringByteChannel;
31 import java.nio.channels.WritableByteChannel;
32
33 final class SocketSendBufferPool implements ExternalResourceReleasable {
34
35 private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
36
37 private static final int DEFAULT_PREALLOCATION_SIZE = 65536;
38 private static final int ALIGN_SHIFT = 4;
39 private static final int ALIGN_MASK = 15;
40
41 PreallocationRef poolHead;
42 Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
43
44 SendBuffer acquire(Object message) {
45 if (message instanceof ChannelBuffer) {
46 return acquire((ChannelBuffer) message);
47 } else if (message instanceof FileRegion) {
48 return acquire((FileRegion) message);
49 }
50
51 throw new IllegalArgumentException(
52 "unsupported message type: " + message.getClass());
53 }
54
55 private SendBuffer acquire(FileRegion src) {
56 if (src.getCount() == 0) {
57 return EMPTY_BUFFER;
58 }
59 return new FileSendBuffer(src);
60 }
61
62 private SendBuffer acquire(ChannelBuffer src) {
63 final int size = src.readableBytes();
64 if (size == 0) {
65 return EMPTY_BUFFER;
66 }
67
68
69 if (src instanceof CompositeChannelBuffer && ((CompositeChannelBuffer) src).useGathering()) {
70 return new GatheringSendBuffer(src.toByteBuffers());
71 }
72
73 if (src.isDirect()) {
74 return new UnpooledSendBuffer(src.toByteBuffer());
75 }
76 if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
77 return new UnpooledSendBuffer(src.toByteBuffer());
78 }
79
80 Preallocation current = this.current;
81 ByteBuffer buffer = current.buffer;
82 int remaining = buffer.remaining();
83 PooledSendBuffer dst;
84
85 if (size < remaining) {
86 int nextPos = buffer.position() + size;
87 ByteBuffer slice = buffer.duplicate();
88 buffer.position(align(nextPos));
89 slice.limit(nextPos);
90 current.refCnt ++;
91 dst = new PooledSendBuffer(current, slice);
92 } else if (size > remaining) {
93 this.current = current = getPreallocation();
94 buffer = current.buffer;
95 ByteBuffer slice = buffer.duplicate();
96 buffer.position(align(size));
97 slice.limit(size);
98 current.refCnt ++;
99 dst = new PooledSendBuffer(current, slice);
100 } else {
101 current.refCnt ++;
102 this.current = getPreallocation0();
103 dst = new PooledSendBuffer(current, current.buffer);
104 }
105
106 ByteBuffer dstbuf = dst.buffer;
107 dstbuf.mark();
108 src.getBytes(src.readerIndex(), dstbuf);
109 dstbuf.reset();
110 return dst;
111 }
112
113 private Preallocation getPreallocation() {
114 Preallocation current = this.current;
115 if (current.refCnt == 0) {
116 current.buffer.clear();
117 return current;
118 }
119
120 return getPreallocation0();
121 }
122
123 private Preallocation getPreallocation0() {
124 PreallocationRef ref = poolHead;
125 if (ref != null) {
126 do {
127 Preallocation p = ref.get();
128 ref = ref.next;
129
130 if (p != null) {
131 poolHead = ref;
132 return p;
133 }
134 } while (ref != null);
135
136 poolHead = ref;
137 }
138
139 return new Preallocation(DEFAULT_PREALLOCATION_SIZE);
140 }
141
142 private static int align(int pos) {
143 int q = pos >>> ALIGN_SHIFT;
144 int r = pos & ALIGN_MASK;
145 if (r != 0) {
146 q ++;
147 }
148 return q << ALIGN_SHIFT;
149 }
150
151 private static final class Preallocation {
152 final ByteBuffer buffer;
153 int refCnt;
154
155 Preallocation(int capacity) {
156 buffer = ByteBuffer.allocateDirect(capacity);
157 }
158 }
159
160 private final class PreallocationRef extends SoftReference<Preallocation> {
161 final PreallocationRef next;
162
163 PreallocationRef(Preallocation prealloation, PreallocationRef next) {
164 super(prealloation);
165 this.next = next;
166 }
167 }
168
169 interface SendBuffer {
170 boolean finished();
171 long writtenBytes();
172 long totalBytes();
173
174 long transferTo(WritableByteChannel ch) throws IOException;
175 long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
176
177 void release();
178 }
179
180 static class UnpooledSendBuffer implements SendBuffer {
181
182 final ByteBuffer buffer;
183 final int initialPos;
184
185 UnpooledSendBuffer(ByteBuffer buffer) {
186 this.buffer = buffer;
187 initialPos = buffer.position();
188 }
189
190 public final boolean finished() {
191 return !buffer.hasRemaining();
192 }
193
194 public final long writtenBytes() {
195 return buffer.position() - initialPos;
196 }
197
198 public final long totalBytes() {
199 return buffer.limit() - initialPos;
200 }
201
202 public final long transferTo(WritableByteChannel ch) throws IOException {
203 return ch.write(buffer);
204 }
205
206 public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
207 return ch.send(buffer, raddr);
208 }
209
210 public void release() {
211
212 }
213 }
214
215 final class PooledSendBuffer extends UnpooledSendBuffer {
216
217 private final Preallocation parent;
218
219 PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
220 super(buffer);
221 this.parent = parent;
222 }
223
224 @Override
225 public void release() {
226 final Preallocation parent = this.parent;
227 if (-- parent.refCnt == 0) {
228 parent.buffer.clear();
229 if (parent != current) {
230 poolHead = new PreallocationRef(parent, poolHead);
231 }
232 }
233 }
234 }
235
236 static class GatheringSendBuffer implements SendBuffer {
237
238 private final ByteBuffer[] buffers;
239 private final int last;
240 private long written;
241 private final int total;
242
243 GatheringSendBuffer(ByteBuffer[] buffers) {
244 this.buffers = buffers;
245 last = buffers.length - 1;
246 int total = 0;
247 for (ByteBuffer buf: buffers) {
248 total += buf.remaining();
249 }
250 this.total = total;
251 }
252
253 public boolean finished() {
254 return !buffers[last].hasRemaining();
255 }
256
257 public long writtenBytes() {
258 return written;
259 }
260
261 public long totalBytes() {
262 return total;
263 }
264
265 public long transferTo(WritableByteChannel ch) throws IOException {
266 if (ch instanceof GatheringByteChannel) {
267 long w = ((GatheringByteChannel) ch).write(buffers);
268 written += w;
269 return w;
270 } else {
271 int send = 0;
272 for (ByteBuffer buf: buffers) {
273 if (buf.hasRemaining()) {
274 int w = ch.write(buf);
275 if (w == 0) {
276 break;
277 } else {
278 send += w;
279 }
280 }
281 }
282 written += send;
283 return send;
284 }
285 }
286
287 public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
288 int send = 0;
289 for (ByteBuffer buf: buffers) {
290 if (buf.hasRemaining()) {
291 int w = ch.send(buf, raddr);
292 if (w == 0) {
293 break;
294 } else {
295 send += w;
296 }
297 }
298 }
299 written += send;
300
301 return send;
302 }
303
304 public void release() {
305
306 }
307
308 }
309
310 final class FileSendBuffer implements SendBuffer {
311
312 private final FileRegion file;
313 private long writtenBytes;
314
315
316 FileSendBuffer(FileRegion file) {
317 this.file = file;
318 }
319
320 public boolean finished() {
321 return writtenBytes >= file.getCount();
322 }
323
324 public long writtenBytes() {
325 return writtenBytes;
326 }
327
328 public long totalBytes() {
329 return file.getCount();
330 }
331
332 public long transferTo(WritableByteChannel ch) throws IOException {
333 long localWrittenBytes = file.transferTo(ch, writtenBytes);
334 writtenBytes += localWrittenBytes;
335 return localWrittenBytes;
336 }
337
338 public long transferTo(DatagramChannel ch, SocketAddress raddr) {
339 throw new UnsupportedOperationException();
340 }
341
342 public void release() {
343 if (file instanceof DefaultFileRegion) {
344 if (((DefaultFileRegion) file).releaseAfterTransfer()) {
345
346
347 file.releaseExternalResources();
348 }
349 }
350 }
351 }
352
353 static final class EmptySendBuffer implements SendBuffer {
354
355 public boolean finished() {
356 return true;
357 }
358
359 public long writtenBytes() {
360 return 0;
361 }
362
363 public long totalBytes() {
364 return 0;
365 }
366
367 public long transferTo(WritableByteChannel ch) {
368 return 0;
369 }
370
371 public long transferTo(DatagramChannel ch, SocketAddress raddr) {
372 return 0;
373 }
374
375 public void release() {
376
377 }
378 }
379
380 public void releaseExternalResources() {
381 if (current.buffer != null) {
382 ByteBufferUtil.destroy(current.buffer);
383 }
384 }
385 }