View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.stomp;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.DecoderResult;
23  import io.netty.handler.codec.ReplayingDecoder;
24  import io.netty.handler.codec.TooLongFrameException;
25  import io.netty.handler.codec.stomp.StompSubframeDecoder.State;
26  import io.netty.util.ByteProcessor;
27  import io.netty.util.internal.AppendableCharSequence;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.util.List;
31  
32  import static io.netty.buffer.ByteBufUtil.*;
33  import static io.netty.util.internal.ObjectUtil.*;
34  
35  /**
36   * Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and {@link StompContentSubframe}s.
37   *
38   * <h3>Parameters to control memory consumption: </h3>
39   * {@code maxLineLength} the maximum length of line - restricts length of command and header lines If the length of the
40   * initial line exceeds this value, a {@link TooLongFrameException} will be raised.
41   * <br>
42   * {@code maxChunkSize} The maximum length of the content or each chunk.  If the content length (or the length of each
43   * chunk) exceeds this value, the content or chunk ill be split into multiple {@link StompContentSubframe}s whose length
44   * is {@code maxChunkSize} at maximum.
45   *
46   * <h3>Chunked Content</h3>
47   * <p>
48   * If the content of a stomp message is greater than {@code maxChunkSize} the transfer encoding of the HTTP message is
49   * 'chunked', this decoder generates multiple {@link StompContentSubframe} instances to avoid excessive memory
50   * consumption. Note, that every message, even with no content decodes with {@link LastStompContentSubframe} at the end
51   * to simplify upstream message parsing.
52   */
53  public class StompSubframeDecoder extends ReplayingDecoder<State> {
54  
55      private static final int DEFAULT_CHUNK_SIZE = 8132;
56      private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
57  
58      /**
59       * @deprecated this should never be used by an user!
60       */
61      @Deprecated
62      public enum State {
63          SKIP_CONTROL_CHARACTERS,
64          READ_HEADERS,
65          READ_CONTENT,
66          FINALIZE_FRAME_READ,
67          BAD_FRAME,
68          INVALID_CHUNK
69      }
70  
71      private final Utf8LineParser commandParser;
72      private final HeaderParser headerParser;
73      private final int maxChunkSize;
74      private int alreadyReadChunkSize;
75      private LastStompContentSubframe lastContent;
76      private long contentLength = -1;
77  
78      public StompSubframeDecoder() {
79          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
80      }
81  
82      public StompSubframeDecoder(boolean validateHeaders) {
83          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE, validateHeaders);
84      }
85  
86      public StompSubframeDecoder(int maxLineLength, int maxChunkSize) {
87          this(maxLineLength, maxChunkSize, false);
88      }
89  
90      public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
91          super(State.SKIP_CONTROL_CHARACTERS);
92          checkPositive(maxLineLength, "maxLineLength");
93          checkPositive(maxChunkSize, "maxChunkSize");
94          this.maxChunkSize = maxChunkSize;
95          commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength);
96          headerParser = new HeaderParser(new AppendableCharSequence(128), maxLineLength, validateHeaders);
97      }
98  
99      @Override
100     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
101         switch (state()) {
102             case SKIP_CONTROL_CHARACTERS:
103                 skipControlCharacters(in);
104                 checkpoint(State.READ_HEADERS);
105                 // Fall through.
106             case READ_HEADERS:
107                 StompCommand command = StompCommand.UNKNOWN;
108                 StompHeadersSubframe frame = null;
109                 try {
110                     command = readCommand(in);
111                     frame = new DefaultStompHeadersSubframe(command);
112                     checkpoint(readHeaders(in, frame));
113                     out.add(frame);
114                 } catch (Exception e) {
115                     if (frame == null) {
116                         frame = new DefaultStompHeadersSubframe(command);
117                     }
118                     frame.setDecoderResult(DecoderResult.failure(e));
119                     out.add(frame);
120                     checkpoint(State.BAD_FRAME);
121                     return;
122                 }
123                 break;
124             case BAD_FRAME:
125                 in.skipBytes(actualReadableBytes());
126                 return;
127         }
128         try {
129             switch (state()) {
130                 case READ_CONTENT:
131                     int toRead = in.readableBytes();
132                     if (toRead == 0) {
133                         return;
134                     }
135                     if (toRead > maxChunkSize) {
136                         toRead = maxChunkSize;
137                     }
138                     if (contentLength >= 0) {
139                         int remainingLength = (int) (contentLength - alreadyReadChunkSize);
140                         if (toRead > remainingLength) {
141                             toRead = remainingLength;
142                         }
143                         ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
144                         if ((alreadyReadChunkSize += toRead) >= contentLength) {
145                             lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
146                             checkpoint(State.FINALIZE_FRAME_READ);
147                         } else {
148                             out.add(new DefaultStompContentSubframe(chunkBuffer));
149                             return;
150                         }
151                     } else {
152                         int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL);
153                         if (nulIndex == in.readerIndex()) {
154                             checkpoint(State.FINALIZE_FRAME_READ);
155                         } else {
156                             if (nulIndex > 0) {
157                                 toRead = nulIndex - in.readerIndex();
158                             } else {
159                                 toRead = in.writerIndex() - in.readerIndex();
160                             }
161                             ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
162                             alreadyReadChunkSize += toRead;
163                             if (nulIndex > 0) {
164                                 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
165                                 checkpoint(State.FINALIZE_FRAME_READ);
166                             } else {
167                                 out.add(new DefaultStompContentSubframe(chunkBuffer));
168                                 return;
169                             }
170                         }
171                     }
172                     // Fall through.
173                 case FINALIZE_FRAME_READ:
174                     skipNullCharacter(in);
175                     if (lastContent == null) {
176                         lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
177                     }
178                     out.add(lastContent);
179                     resetDecoder();
180             }
181         } catch (Exception e) {
182             if (lastContent != null) {
183                 lastContent.release();
184                 lastContent = null;
185             }
186 
187             StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER);
188             errorContent.setDecoderResult(DecoderResult.failure(e));
189             out.add(errorContent);
190             checkpoint(State.BAD_FRAME);
191         }
192     }
193 
194     private StompCommand readCommand(ByteBuf in) {
195         CharSequence commandSequence = commandParser.parse(in);
196         if (commandSequence == null) {
197             throw new DecoderException("Failed to read command from channel");
198         }
199         String commandStr = commandSequence.toString();
200         try {
201             return StompCommand.valueOf(commandStr);
202         } catch (IllegalArgumentException iae) {
203             throw new DecoderException("Cannot to parse command " + commandStr);
204         }
205     }
206 
207     private State readHeaders(ByteBuf buffer, StompHeadersSubframe headersSubframe) {
208         StompHeaders headers = headersSubframe.headers();
209         for (;;) {
210             boolean headerRead = headerParser.parseHeader(headersSubframe, buffer);
211             if (!headerRead) {
212                 if (headers.contains(StompHeaders.CONTENT_LENGTH)) {
213                     contentLength = getContentLength(headers);
214                     if (contentLength == 0) {
215                         return State.FINALIZE_FRAME_READ;
216                     }
217                 }
218                 return State.READ_CONTENT;
219             }
220         }
221     }
222 
223     private static long getContentLength(StompHeaders headers) {
224         long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, 0L);
225         if (contentLength < 0) {
226             throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative");
227         }
228         return contentLength;
229     }
230 
231     private static void skipNullCharacter(ByteBuf buffer) {
232         byte b = buffer.readByte();
233         if (b != StompConstants.NUL) {
234             throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
235         }
236     }
237 
238     private static void skipControlCharacters(ByteBuf buffer) {
239         byte b;
240         for (;;) {
241             b = buffer.readByte();
242             if (b != StompConstants.CR && b != StompConstants.LF) {
243                 buffer.readerIndex(buffer.readerIndex() - 1);
244                 break;
245             }
246         }
247     }
248 
249     private void resetDecoder() {
250         checkpoint(State.SKIP_CONTROL_CHARACTERS);
251         contentLength = -1;
252         alreadyReadChunkSize = 0;
253         lastContent = null;
254     }
255 
256     private static class Utf8LineParser implements ByteProcessor {
257 
258         private final AppendableCharSequence charSeq;
259         private final int maxLineLength;
260 
261         private int lineLength;
262         private char interim;
263         private boolean nextRead;
264 
265         Utf8LineParser(AppendableCharSequence charSeq, int maxLineLength) {
266             this.charSeq = checkNotNull(charSeq, "charSeq");
267             this.maxLineLength = maxLineLength;
268         }
269 
270         AppendableCharSequence parse(ByteBuf byteBuf) {
271             reset();
272             int offset = byteBuf.forEachByte(this);
273             if (offset == -1) {
274                 return null;
275             }
276 
277             byteBuf.readerIndex(offset + 1);
278             return charSeq;
279         }
280 
281         AppendableCharSequence charSequence() {
282             return charSeq;
283         }
284 
285         @Override
286         public boolean process(byte nextByte) throws Exception {
287             if (nextByte == StompConstants.CR) {
288                 ++lineLength;
289                 return true;
290             }
291 
292             if (nextByte == StompConstants.LF) {
293                 return false;
294             }
295 
296             if (++lineLength > maxLineLength) {
297                 throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
298             }
299 
300             // 1 byte   -   0xxxxxxx                    -  7 bits
301             // 2 byte   -   110xxxxx 10xxxxxx           -  11 bits
302             // 3 byte   -   1110xxxx 10xxxxxx 10xxxxxx  -  16 bits
303             if (nextRead) {
304                 interim |= (nextByte & 0x3F) << 6;
305                 nextRead = false;
306             } else if (interim != 0) { // flush 2 or 3 byte
307                 appendTo(charSeq, (char) (interim | (nextByte & 0x3F)));
308                 interim = 0;
309             } else if (nextByte >= 0) { // INITIAL BRANCH
310                 // The first 128 characters (US-ASCII) need one byte.
311                 appendTo(charSeq, (char) nextByte);
312             } else if ((nextByte & 0xE0) == 0xC0) {
313                 // The next 1920 characters need two bytes and we can define
314                 // a first byte by mask 110xxxxx.
315                 interim = (char) ((nextByte & 0x1F) << 6);
316             } else {
317                 // The rest of characters need three bytes.
318                 interim = (char) ((nextByte & 0x0F) << 12);
319                 nextRead = true;
320             }
321 
322             return true;
323         }
324 
325         protected void appendTo(AppendableCharSequence charSeq, char chr) {
326             charSeq.append(chr);
327         }
328 
329         protected void reset() {
330             charSeq.reset();
331             lineLength = 0;
332             interim = 0;
333             nextRead = false;
334         }
335     }
336 
337     private static final class HeaderParser extends Utf8LineParser {
338 
339         private final boolean validateHeaders;
340 
341         private String name;
342         private boolean valid;
343 
344         private boolean shouldUnescape;
345         private boolean unescapeInProgress;
346 
347         HeaderParser(AppendableCharSequence charSeq, int maxLineLength, boolean validateHeaders) {
348             super(charSeq, maxLineLength);
349             this.validateHeaders = validateHeaders;
350         }
351 
352         boolean parseHeader(StompHeadersSubframe headersSubframe, ByteBuf buf) {
353             shouldUnescape = shouldUnescape(headersSubframe.command());
354             AppendableCharSequence value = super.parse(buf);
355             if (value == null || (name == null && value.length() == 0)) {
356                 return false;
357             }
358 
359             if (valid) {
360                 headersSubframe.headers().add(name, value.toString());
361             } else if (validateHeaders) {
362                 if (StringUtil.isNullOrEmpty(name)) {
363                     throw new IllegalArgumentException("received an invalid header line '" + value + '\'');
364                 }
365                 String line = name + ':' + value;
366                 throw new IllegalArgumentException("a header value or name contains a prohibited character ':'"
367                                                    + ", " + line);
368             }
369             return true;
370         }
371 
372         @Override
373         public boolean process(byte nextByte) throws Exception {
374             if (nextByte == StompConstants.COLON) {
375                 if (name == null) {
376                     AppendableCharSequence charSeq = charSequence();
377                     if (charSeq.length() != 0) {
378                         name = charSeq.substring(0, charSeq.length());
379                         charSeq.reset();
380                         valid = true;
381                         return true;
382                     } else {
383                         name = StringUtil.EMPTY_STRING;
384                     }
385                 } else {
386                     valid = false;
387                 }
388             }
389 
390             return super.process(nextByte);
391         }
392 
393         @Override
394         protected void appendTo(AppendableCharSequence charSeq, char chr) {
395             if (!shouldUnescape) {
396                 super.appendTo(charSeq, chr);
397                 return;
398             }
399 
400             if (chr == '\\') {
401                 if (unescapeInProgress) {
402                     super.appendTo(charSeq, chr);
403                     unescapeInProgress = false;
404                 } else {
405                     unescapeInProgress = true;
406                 }
407                 return;
408             }
409 
410             if (unescapeInProgress) {
411                 if (chr == 'c') {
412                     charSeq.append(':');
413                 } else if (chr == 'r') {
414                     charSeq.append('\r');
415                 } else if (chr == 'n') {
416                     charSeq.append('\n');
417                 } else {
418                     charSeq.append('\\').append(chr);
419                     throw new IllegalArgumentException("received an invalid escape header sequence '" + charSeq + '\'');
420                 }
421 
422                 unescapeInProgress = false;
423                 return;
424             }
425 
426             super.appendTo(charSeq, chr);
427         }
428 
429         @Override
430         protected void reset() {
431             name = null;
432             valid = false;
433             unescapeInProgress = false;
434             super.reset();
435         }
436 
437         private static boolean shouldUnescape(StompCommand command) {
438             return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
439         }
440     }
441 }