View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.buffer;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.util.Send;
20  import io.netty5.util.internal.StringUtil;
21  
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.EOFException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  
28  import static java.util.Objects.requireNonNull;
29  
30  /**
31   * An {@link InputStream} which reads data from a {@link Buffer}.
32   * <p>
33   * A read operation against this stream will occur at the {@code readerOffset}
34   * of its underlying buffer and the {@code readerOffset} will increase during
35   * the read operation.  Please note that it only reads up to the number of
36   * readable bytes determined at the moment of construction.  Therefore,
37   * updating {@link Buffer#writerOffset()} will not affect the return
38   * value of {@link #available()}.
39   * <p>
40   * This stream implements {@link DataInput} for your convenience.
41   * The endianness of the stream is always big endian.
42   *
43   * @see BufferOutputStream
44   */
45  public final class BufferInputStream extends InputStream implements DataInput {
46      private final Buffer buffer;
47      private final int startIndex;
48      private final int endIndex;
49      private boolean closed;
50      private int markReaderOffset;
51  
52      /**
53       * Creates a new stream which reads data from the specified {@code buffer} starting at the current
54       * {@code readerOffset} and ending at the current {@code writerOffset}.
55       * <p>
56       * When this {@link BufferInputStream} is {@linkplain #close() closed, then the sent buffer will also be closed.
57       *
58       * @param buffer The buffer which provides the content for this {@link InputStream}.
59       */
60      public BufferInputStream(Send<Buffer> buffer) {
61          this.buffer = requireNonNull(buffer, "buffer").receive();
62          int readableBytes = this.buffer.readableBytes();
63          startIndex = this.buffer.readerOffset();
64          endIndex = startIndex + readableBytes;
65          markReaderOffset = startIndex;
66      }
67  
68      /**
69       * Returns the number of read bytes by this stream so far.
70       */
71      public int readBytes() {
72          return buffer.readerOffset() - startIndex;
73      }
74  
75      @Override
76      public void close() throws IOException {
77          if (closed) {
78              // The Closable interface says "If the stream is already closed then invoking this method has no effect."
79              return;
80          }
81          try (buffer) {
82              closed = true;
83              super.close();
84          }
85      }
86  
87      @Override
88      public int available() throws IOException {
89          return Math.max(0, endIndex - buffer.readerOffset());
90      }
91  
92      // Suppress a warning since the class is not thread-safe
93      @Override
94      public void mark(int readlimit) { // lgtm[java/non-sync-override]
95          markReaderOffset = buffer.readerOffset();
96      }
97  
98      @Override
99      public boolean markSupported() {
100         return true;
101     }
102 
103     @Override
104     public int read() throws IOException {
105         checkOpen();
106         int available = available();
107         if (available == 0) {
108             return -1;
109         }
110         return buffer.readByte() & 0xff;
111     }
112 
113     @Override
114     public int read(byte[] b, int off, int len) throws IOException {
115         checkOpen();
116         int available = available();
117         if (available == 0) {
118             return -1;
119         }
120 
121         len = Math.min(available, len);
122         buffer.readBytes(b, off, len);
123         return len;
124     }
125 
126     // Suppress a warning since the class is not thread-safe
127     @Override
128     public void reset() throws IOException { // lgtm[java/non-sync-override]
129         buffer.readerOffset(markReaderOffset);
130     }
131 
132     @Override
133     public long skip(long n) throws IOException {
134         return skipBytes((int) Math.min(Integer.MAX_VALUE, n));
135     }
136 
137     @Override
138     public boolean readBoolean() throws IOException {
139         checkAvailable(1);
140         return read() != 0;
141     }
142 
143     @Override
144     public byte readByte() throws IOException {
145         int available = available();
146         if (available == 0) {
147             throw new EOFException();
148         }
149         return buffer.readByte();
150     }
151 
152     @Override
153     public char readChar() throws IOException {
154         return (char) readShort();
155     }
156 
157     @Override
158     public double readDouble() throws IOException {
159         return Double.longBitsToDouble(readLong());
160     }
161 
162     @Override
163     public float readFloat() throws IOException {
164         return Float.intBitsToFloat(readInt());
165     }
166 
167     @Override
168     public void readFully(byte[] b) throws IOException {
169         readFully(b, 0, b.length);
170     }
171 
172     @Override
173     public void readFully(byte[] b, int off, int len) throws IOException {
174         checkAvailable(len);
175         buffer.readBytes(b, off, len);
176     }
177 
178     @Override
179     public int readInt() throws IOException {
180         checkAvailable(4);
181         return buffer.readInt();
182     }
183 
184     private StringBuilder lineBuf;
185 
186     @Override
187     public String readLine() throws IOException {
188         int available = available();
189         if (available == 0) {
190             return null;
191         }
192 
193         if (lineBuf != null) {
194             lineBuf.setLength(0);
195         }
196 
197         loop: do {
198             int c = buffer.readUnsignedByte();
199             --available;
200             switch (c) {
201                 case '\n':
202                     break loop;
203 
204                 case '\r':
205                     if (available > 0 && (char) buffer.getUnsignedByte(buffer.readerOffset()) == '\n') {
206                         buffer.skipReadableBytes(1);
207                     }
208                     break loop;
209 
210                 default:
211                     if (lineBuf == null) {
212                         lineBuf = new StringBuilder();
213                     }
214                     lineBuf.append((char) c);
215             }
216         } while (available > 0);
217 
218         return lineBuf != null && lineBuf.length() > 0 ? lineBuf.toString() : StringUtil.EMPTY_STRING;
219     }
220 
221     @Override
222     public long readLong() throws IOException {
223         checkAvailable(8);
224         return buffer.readLong();
225     }
226 
227     @Override
228     public short readShort() throws IOException {
229         checkAvailable(2);
230         return buffer.readShort();
231     }
232 
233     @Override
234     public String readUTF() throws IOException {
235         return DataInputStream.readUTF(this);
236     }
237 
238     @Override
239     public int readUnsignedByte() throws IOException {
240         return readByte() & 0xff;
241     }
242 
243     @Override
244     public int readUnsignedShort() throws IOException {
245         return readShort() & 0xffff;
246     }
247 
248     @Override
249     public int skipBytes(int n) throws IOException {
250         int nBytes = Math.min(available(), n);
251         buffer.skipReadableBytes(nBytes);
252         return nBytes;
253     }
254 
255     /**
256      * Expose the internally owned buffer.
257      * Use only for testing purpose.
258      *
259      * @return The internal buffer instance.
260      */
261     Buffer buffer() {
262         return buffer;
263     }
264 
265     private void checkOpen() throws IOException {
266         if (closed) {
267             throw new IOException("Stream closed");
268         }
269     }
270 
271     private void checkAvailable(int fieldSize) throws IOException {
272         checkOpen();
273         if (fieldSize < 0) {
274             throw new IndexOutOfBoundsException("fieldSize cannot be a negative number");
275         }
276         if (fieldSize > available()) {
277             throw new EOFException("fieldSize is too long! Length is " + fieldSize
278                     + ", but maximum is " + available());
279         }
280     }
281 }