1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.stream;
17
18 import static org.jboss.netty.buffer.ChannelBuffers.*;
19
20 import java.nio.ByteBuffer;
21 import java.nio.channels.ReadableByteChannel;
22
23 import org.jboss.netty.buffer.ChannelBuffer;
24
25
26
27
28
29
30 public class ChunkedNioStream implements ChunkedInput {
31
32 private final ReadableByteChannel in;
33
34 private final int chunkSize;
35 private long offset;
36
37
38
39
40 private final ByteBuffer byteBuffer;
41
42
43
44
45 public ChunkedNioStream(ReadableByteChannel in) {
46 this(in, ChunkedStream.DEFAULT_CHUNK_SIZE);
47 }
48
49
50
51
52
53
54
55 public ChunkedNioStream(ReadableByteChannel in, int chunkSize) {
56 if (in == null) {
57 throw new NullPointerException("in");
58 }
59 if (chunkSize <= 0) {
60 throw new IllegalArgumentException("chunkSize: " + chunkSize +
61 " (expected: a positive integer)");
62 }
63 this.in = in;
64 offset = 0;
65 this.chunkSize = chunkSize;
66 byteBuffer = ByteBuffer.allocate(chunkSize);
67 }
68
69
70
71
72 public long getTransferredBytes() {
73 return offset;
74 }
75
76 public boolean hasNextChunk() throws Exception {
77 if (byteBuffer.position() > 0) {
78
79 return true;
80 }
81 if (in.isOpen()) {
82
83 int b = in.read(byteBuffer);
84 if (b < 0) {
85 return false;
86 } else {
87 offset += b;
88 return true;
89 }
90 }
91 return false;
92 }
93
94 public boolean isEndOfInput() throws Exception {
95 return !hasNextChunk();
96 }
97
98 public void close() throws Exception {
99 in.close();
100 }
101
102 public Object nextChunk() throws Exception {
103 if (!hasNextChunk()) {
104 return null;
105 }
106
107 int readBytes = byteBuffer.position();
108 for (;;) {
109 int localReadBytes = in.read(byteBuffer);
110 if (localReadBytes < 0) {
111 break;
112 }
113 readBytes += localReadBytes;
114 offset += localReadBytes;
115
116 if (readBytes == chunkSize) {
117 break;
118 }
119 }
120 byteBuffer.flip();
121
122 ChannelBuffer buffer = copiedBuffer(byteBuffer);
123 byteBuffer.clear();
124 return buffer;
125 }
126 }