View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.stomp;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.DecoderResult;
23  import io.netty.handler.codec.ReplayingDecoder;
24  import io.netty.handler.codec.TooLongFrameException;
25  import io.netty.handler.codec.stomp.StompSubframeDecoder.State;
26  import io.netty.util.ByteProcessor;
27  import io.netty.util.internal.AppendableCharSequence;
28  import io.netty.util.internal.StringUtil;
29  import io.netty.util.internal.UnstableApi;
30  
31  import java.util.List;
32  
33  import static io.netty.buffer.ByteBufUtil.*;
34  import static io.netty.util.internal.ObjectUtil.*;
35  
36  /**
37   * Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and {@link StompContentSubframe}s.
38   *
39   * <h3>Parameters to control memory consumption: </h3>
40   * {@code maxLineLength} the maximum length of line - restricts length of command and header lines If the length of the
41   * initial line exceeds this value, a {@link TooLongFrameException} will be raised.
42   * <br>
43   * {@code maxChunkSize} The maximum length of the content or each chunk.  If the content length (or the length of each
44   * chunk) exceeds this value, the content or chunk ill be split into multiple {@link StompContentSubframe}s whose length
45   * is {@code maxChunkSize} at maximum.
46   *
47   * <h3>Chunked Content</h3>
48   * <p>
49   * If the content of a stomp message is greater than {@code maxChunkSize} the transfer encoding of the HTTP message is
50   * 'chunked', this decoder generates multiple {@link StompContentSubframe} instances to avoid excessive memory
51   * consumption. Note, that every message, even with no content decodes with {@link LastStompContentSubframe} at the end
52   * to simplify upstream message parsing.
53   */
54  public class StompSubframeDecoder extends ReplayingDecoder<State> {
55  
56      private static final int DEFAULT_CHUNK_SIZE = 8132;
57      private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
58  
59      @UnstableApi
60      public enum State {
61          SKIP_CONTROL_CHARACTERS,
62          READ_HEADERS,
63          READ_CONTENT,
64          FINALIZE_FRAME_READ,
65          BAD_FRAME,
66          INVALID_CHUNK
67      }
68  
69      private final Utf8LineParser commandParser;
70      private final HeaderParser headerParser;
71      private final int maxChunkSize;
72      private int alreadyReadChunkSize;
73      private LastStompContentSubframe lastContent;
74      private long contentLength = -1;
75  
76      public StompSubframeDecoder() {
77          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
78      }
79  
80      public StompSubframeDecoder(boolean validateHeaders) {
81          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE, validateHeaders);
82      }
83  
84      public StompSubframeDecoder(int maxLineLength, int maxChunkSize) {
85          this(maxLineLength, maxChunkSize, false);
86      }
87  
88      public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
89          super(State.SKIP_CONTROL_CHARACTERS);
90          checkPositive(maxLineLength, "maxLineLength");
91          checkPositive(maxChunkSize, "maxChunkSize");
92          this.maxChunkSize = maxChunkSize;
93          commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength);
94          headerParser = new HeaderParser(new AppendableCharSequence(128), maxLineLength, validateHeaders);
95      }
96  
97      @Override
98      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
99          switch (state()) {
100             case SKIP_CONTROL_CHARACTERS:
101                 skipControlCharacters(in);
102                 checkpoint(State.READ_HEADERS);
103                 // Fall through.
104             case READ_HEADERS:
105                 StompCommand command = StompCommand.UNKNOWN;
106                 StompHeadersSubframe frame = null;
107                 try {
108                     command = readCommand(in);
109                     frame = new DefaultStompHeadersSubframe(command);
110                     checkpoint(readHeaders(in, frame.headers()));
111                     out.add(frame);
112                 } catch (Exception e) {
113                     if (frame == null) {
114                         frame = new DefaultStompHeadersSubframe(command);
115                     }
116                     frame.setDecoderResult(DecoderResult.failure(e));
117                     out.add(frame);
118                     checkpoint(State.BAD_FRAME);
119                     return;
120                 }
121                 break;
122             case BAD_FRAME:
123                 in.skipBytes(actualReadableBytes());
124                 return;
125         }
126         try {
127             switch (state()) {
128                 case READ_CONTENT:
129                     int toRead = in.readableBytes();
130                     if (toRead == 0) {
131                         return;
132                     }
133                     if (toRead > maxChunkSize) {
134                         toRead = maxChunkSize;
135                     }
136                     if (contentLength >= 0) {
137                         int remainingLength = (int) (contentLength - alreadyReadChunkSize);
138                         if (toRead > remainingLength) {
139                             toRead = remainingLength;
140                         }
141                         ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
142                         if ((alreadyReadChunkSize += toRead) >= contentLength) {
143                             lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
144                             checkpoint(State.FINALIZE_FRAME_READ);
145                         } else {
146                             out.add(new DefaultStompContentSubframe(chunkBuffer));
147                             return;
148                         }
149                     } else {
150                         int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL);
151                         if (nulIndex == in.readerIndex()) {
152                             checkpoint(State.FINALIZE_FRAME_READ);
153                         } else {
154                             if (nulIndex > 0) {
155                                 toRead = nulIndex - in.readerIndex();
156                             } else {
157                                 toRead = in.writerIndex() - in.readerIndex();
158                             }
159                             ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
160                             alreadyReadChunkSize += toRead;
161                             if (nulIndex > 0) {
162                                 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
163                                 checkpoint(State.FINALIZE_FRAME_READ);
164                             } else {
165                                 out.add(new DefaultStompContentSubframe(chunkBuffer));
166                                 return;
167                             }
168                         }
169                     }
170                     // Fall through.
171                 case FINALIZE_FRAME_READ:
172                     skipNullCharacter(in);
173                     if (lastContent == null) {
174                         lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
175                     }
176                     out.add(lastContent);
177                     resetDecoder();
178             }
179         } catch (Exception e) {
180             if (lastContent != null) {
181                 lastContent.release();
182                 lastContent = null;
183             }
184 
185             StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER);
186             errorContent.setDecoderResult(DecoderResult.failure(e));
187             out.add(errorContent);
188             checkpoint(State.BAD_FRAME);
189         }
190     }
191 
192     private StompCommand readCommand(ByteBuf in) {
193         CharSequence commandSequence = commandParser.parse(in);
194         if (commandSequence == null) {
195             throw new DecoderException("Failed to read command from channel");
196         }
197         String commandStr = commandSequence.toString();
198         try {
199             return StompCommand.valueOf(commandStr);
200         } catch (IllegalArgumentException iae) {
201             throw new DecoderException("Cannot to parse command " + commandStr);
202         }
203     }
204 
205     private State readHeaders(ByteBuf buffer, StompHeaders headers) {
206         for (;;) {
207             boolean headerRead = headerParser.parseHeader(headers, buffer);
208             if (!headerRead) {
209                 if (headers.contains(StompHeaders.CONTENT_LENGTH)) {
210                     contentLength = getContentLength(headers);
211                     if (contentLength == 0) {
212                         return State.FINALIZE_FRAME_READ;
213                     }
214                 }
215                 return State.READ_CONTENT;
216             }
217         }
218     }
219 
220     private static long getContentLength(StompHeaders headers) {
221         long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, 0L);
222         if (contentLength < 0) {
223             throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative");
224         }
225         return contentLength;
226     }
227 
228     private static void skipNullCharacter(ByteBuf buffer) {
229         byte b = buffer.readByte();
230         if (b != StompConstants.NUL) {
231             throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
232         }
233     }
234 
235     private static void skipControlCharacters(ByteBuf buffer) {
236         byte b;
237         for (;;) {
238             b = buffer.readByte();
239             if (b != StompConstants.CR && b != StompConstants.LF) {
240                 buffer.readerIndex(buffer.readerIndex() - 1);
241                 break;
242             }
243         }
244     }
245 
246     private void resetDecoder() {
247         checkpoint(State.SKIP_CONTROL_CHARACTERS);
248         contentLength = -1;
249         alreadyReadChunkSize = 0;
250         lastContent = null;
251     }
252 
253     private static class Utf8LineParser implements ByteProcessor {
254 
255         private final AppendableCharSequence charSeq;
256         private final int maxLineLength;
257 
258         private int lineLength;
259         private char interim;
260         private boolean nextRead;
261 
262         Utf8LineParser(AppendableCharSequence charSeq, int maxLineLength) {
263             this.charSeq = checkNotNull(charSeq, "charSeq");
264             this.maxLineLength = maxLineLength;
265         }
266 
267         AppendableCharSequence parse(ByteBuf byteBuf) {
268             reset();
269             int offset = byteBuf.forEachByte(this);
270             if (offset == -1) {
271                 return null;
272             }
273 
274             byteBuf.readerIndex(offset + 1);
275             return charSeq;
276         }
277 
278         AppendableCharSequence charSequence() {
279             return charSeq;
280         }
281 
282         @Override
283         public boolean process(byte nextByte) throws Exception {
284             if (nextByte == StompConstants.CR) {
285                 ++lineLength;
286                 return true;
287             }
288 
289             if (nextByte == StompConstants.LF) {
290                 return false;
291             }
292 
293             if (++lineLength > maxLineLength) {
294                 throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
295             }
296 
297             // 1 byte   -   0xxxxxxx                    -  7 bits
298             // 2 byte   -   110xxxxx 10xxxxxx           -  11 bits
299             // 3 byte   -   1110xxxx 10xxxxxx 10xxxxxx  -  16 bits
300             if (nextRead) {
301                 interim |= (nextByte & 0x3F) << 6;
302                 nextRead = false;
303             } else if (interim != 0) { // flush 2 or 3 byte
304                 charSeq.append((char) (interim | (nextByte & 0x3F)));
305                 interim = 0;
306             } else if (nextByte >= 0) { // INITIAL BRANCH
307                 // The first 128 characters (US-ASCII) need one byte.
308                 charSeq.append((char) nextByte);
309             } else if ((nextByte & 0xE0) == 0xC0) {
310                 // The next 1920 characters need two bytes and we can define
311                 // a first byte by mask 110xxxxx.
312                 interim = (char) ((nextByte & 0x1F) << 6);
313             } else {
314                 // The rest of characters need three bytes.
315                 interim = (char) ((nextByte & 0x0F) << 12);
316                 nextRead = true;
317             }
318 
319             return true;
320         }
321 
322         protected void reset() {
323             charSeq.reset();
324             lineLength = 0;
325             interim = 0;
326             nextRead = false;
327         }
328     }
329 
330     private static final class HeaderParser extends Utf8LineParser {
331 
332         private final boolean validateHeaders;
333 
334         private String name;
335         private boolean valid;
336 
337         HeaderParser(AppendableCharSequence charSeq, int maxLineLength, boolean validateHeaders) {
338             super(charSeq, maxLineLength);
339             this.validateHeaders = validateHeaders;
340         }
341 
342         boolean parseHeader(StompHeaders headers, ByteBuf buf) {
343             AppendableCharSequence value = super.parse(buf);
344             if (value == null || (name == null && value.length() == 0)) {
345                 return false;
346             }
347 
348             if (valid) {
349                 headers.add(name, value.toString());
350             } else if (validateHeaders) {
351                 if (StringUtil.isNullOrEmpty(name)) {
352                     throw new IllegalArgumentException("received an invalid header line '" + value + '\'');
353                 }
354                 String line = name + ':' + value;
355                 throw new IllegalArgumentException("a header value or name contains a prohibited character ':'"
356                                                    + ", " + line);
357             }
358             return true;
359         }
360 
361         @Override
362         public boolean process(byte nextByte) throws Exception {
363             if (nextByte == StompConstants.COLON) {
364                 if (name == null) {
365                     AppendableCharSequence charSeq = charSequence();
366                     if (charSeq.length() != 0) {
367                         name = charSeq.substring(0, charSeq.length());
368                         charSeq.reset();
369                         valid = true;
370                         return true;
371                     } else {
372                         name = StringUtil.EMPTY_STRING;
373                     }
374                 } else {
375                     valid = false;
376                 }
377             }
378 
379             return super.process(nextByte);
380         }
381 
382         @Override
383         protected void reset() {
384             name = null;
385             valid = false;
386             super.reset();
387         }
388     }
389 }