View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.stomp;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.DecoderResult;
23  import io.netty.handler.codec.ReplayingDecoder;
24  import io.netty.handler.codec.TooLongFrameException;
25  import io.netty.handler.codec.stomp.StompSubframeDecoder.State;
26  import io.netty.util.ByteProcessor;
27  import io.netty.util.internal.AppendableCharSequence;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.util.List;
31  
32  import static io.netty.buffer.ByteBufUtil.*;
33  import static io.netty.util.internal.ObjectUtil.*;
34  
35  /**
36   * Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and {@link StompContentSubframe}s.
37   *
38   * <h3>Parameters to control memory consumption: </h3>
39   * {@code maxLineLength} the maximum length of line - restricts length of command and header lines If the length of the
40   * initial line exceeds this value, a {@link TooLongFrameException} will be raised.
41   * <br>
42   * {@code maxChunkSize} The maximum length of the content or each chunk.  If the content length (or the length of each
43   * chunk) exceeds this value, the content or chunk ill be split into multiple {@link StompContentSubframe}s whose length
44   * is {@code maxChunkSize} at maximum.
45   *
46   * <h3>Chunked Content</h3>
47   * <p>
48   * If the content of a stomp message is greater than {@code maxChunkSize} the transfer encoding of the HTTP message is
49   * 'chunked', this decoder generates multiple {@link StompContentSubframe} instances to avoid excessive memory
50   * consumption. Note, that every message, even with no content decodes with {@link LastStompContentSubframe} at the end
51   * to simplify upstream message parsing.
52   */
53  public class StompSubframeDecoder extends ReplayingDecoder<State> {
54  
55      private static final int DEFAULT_CHUNK_SIZE = 8132;
56      private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
57  
58      /**
59       * @deprecated this should never be used by an user!
60       */
61      @Deprecated
62      public enum State {
63          SKIP_CONTROL_CHARACTERS,
64          READ_HEADERS,
65          READ_CONTENT,
66          FINALIZE_FRAME_READ,
67          BAD_FRAME,
68          INVALID_CHUNK
69      }
70  
71      private final Utf8LineParser commandParser;
72      private final HeaderParser headerParser;
73      private final int maxChunkSize;
74      private int alreadyReadChunkSize;
75      private LastStompContentSubframe lastContent;
76      private long contentLength = -1;
77  
78      public StompSubframeDecoder() {
79          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
80      }
81  
82      public StompSubframeDecoder(boolean validateHeaders) {
83          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE, validateHeaders);
84      }
85  
86      public StompSubframeDecoder(int maxLineLength, int maxChunkSize) {
87          this(maxLineLength, maxChunkSize, false);
88      }
89  
90      public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
91          super(State.SKIP_CONTROL_CHARACTERS);
92          checkPositive(maxLineLength, "maxLineLength");
93          checkPositive(maxChunkSize, "maxChunkSize");
94          this.maxChunkSize = maxChunkSize;
95          commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength);
96          headerParser = new HeaderParser(new AppendableCharSequence(128), maxLineLength, validateHeaders);
97      }
98  
99      @Override
100     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
101         switch (state()) {
102             case SKIP_CONTROL_CHARACTERS:
103                 skipControlCharacters(in);
104                 checkpoint(State.READ_HEADERS);
105                 // Fall through.
106             case READ_HEADERS:
107                 StompCommand command = StompCommand.UNKNOWN;
108                 StompHeadersSubframe frame = null;
109                 try {
110                     command = readCommand(in);
111                     frame = new DefaultStompHeadersSubframe(command);
112                     checkpoint(readHeaders(in, frame));
113                     out.add(frame);
114                 } catch (Exception e) {
115                     if (frame == null) {
116                         frame = new DefaultStompHeadersSubframe(command);
117                     }
118                     frame.setDecoderResult(DecoderResult.failure(e));
119                     out.add(frame);
120                     checkpoint(State.BAD_FRAME);
121                     return;
122                 }
123                 break;
124             case BAD_FRAME:
125                 in.skipBytes(actualReadableBytes());
126                 return;
127         }
128         try {
129             switch (state()) {
130                 case READ_CONTENT:
131                     int toRead = in.readableBytes();
132                     if (toRead == 0) {
133                         return;
134                     }
135                     if (toRead > maxChunkSize) {
136                         toRead = maxChunkSize;
137                     }
138                     if (contentLength >= 0) {
139                         int remainingLength = (int) (contentLength - alreadyReadChunkSize);
140                         if (toRead > remainingLength) {
141                             toRead = remainingLength;
142                         }
143                         ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
144                         if ((alreadyReadChunkSize += toRead) >= contentLength) {
145                             lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
146                             checkpoint(State.FINALIZE_FRAME_READ);
147                         } else {
148                             out.add(new DefaultStompContentSubframe(chunkBuffer));
149                             return;
150                         }
151                     } else {
152                         int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL);
153                         if (nulIndex == in.readerIndex()) {
154                             checkpoint(State.FINALIZE_FRAME_READ);
155                         } else {
156                             if (nulIndex > 0) {
157                                 toRead = nulIndex - in.readerIndex();
158                             } else {
159                                 toRead = in.writerIndex() - in.readerIndex();
160                             }
161                             ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
162                             alreadyReadChunkSize += toRead;
163                             if (nulIndex > 0) {
164                                 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
165                                 checkpoint(State.FINALIZE_FRAME_READ);
166                             } else {
167                                 out.add(new DefaultStompContentSubframe(chunkBuffer));
168                                 return;
169                             }
170                         }
171                     }
172                     // Fall through.
173                 case FINALIZE_FRAME_READ:
174                     skipNullCharacter(in);
175                     if (lastContent == null) {
176                         lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
177                     }
178                     out.add(lastContent);
179                     resetDecoder();
180             }
181         } catch (Exception e) {
182             if (lastContent != null) {
183                 lastContent.release();
184                 lastContent = null;
185             }
186 
187             StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER);
188             errorContent.setDecoderResult(DecoderResult.failure(e));
189             out.add(errorContent);
190             checkpoint(State.BAD_FRAME);
191         }
192     }
193 
194     private StompCommand readCommand(ByteBuf in) {
195         CharSequence commandSequence = commandParser.parse(in);
196         if (commandSequence == null) {
197             throw new DecoderException("Failed to read command from channel");
198         }
199         String commandStr = commandSequence.toString();
200         try {
201             return StompCommand.valueOf(commandStr);
202         } catch (IllegalArgumentException iae) {
203             throw new DecoderException("Cannot to parse command " + commandStr);
204         }
205     }
206 
207     private State readHeaders(ByteBuf buffer, StompHeadersSubframe headersSubframe) {
208         StompHeaders headers = headersSubframe.headers();
209         for (;;) {
210             boolean headerRead = headerParser.parseHeader(headersSubframe, buffer);
211             if (!headerRead) {
212                 if (headers.contains(StompHeaders.CONTENT_LENGTH)) {
213                     contentLength = getContentLength(headers);
214                     if (contentLength == 0) {
215                         return State.FINALIZE_FRAME_READ;
216                     }
217                 }
218                 return State.READ_CONTENT;
219             }
220         }
221     }
222 
223     private static long getContentLength(StompHeaders headers) {
224         long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, 0L);
225         if (contentLength < 0) {
226             throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative");
227         }
228         return contentLength;
229     }
230 
231     private static void skipNullCharacter(ByteBuf buffer) {
232         byte b = buffer.readByte();
233         if (b != StompConstants.NUL) {
234             throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
235         }
236     }
237 
238     private static void skipControlCharacters(ByteBuf buffer) {
239         byte b;
240         for (;;) {
241             if (!buffer.isReadable()) {
242                 return;
243             }
244             b = buffer.readByte();
245             if (b != StompConstants.CR && b != StompConstants.LF) {
246                 buffer.readerIndex(buffer.readerIndex() - 1);
247                 break;
248             }
249         }
250     }
251 
252     private void resetDecoder() {
253         checkpoint(State.SKIP_CONTROL_CHARACTERS);
254         contentLength = -1;
255         alreadyReadChunkSize = 0;
256         lastContent = null;
257     }
258 
259     private static class Utf8LineParser implements ByteProcessor {
260 
261         private final AppendableCharSequence charSeq;
262         private final int maxLineLength;
263 
264         private int lineLength;
265         private char interim;
266         private boolean nextRead;
267 
268         Utf8LineParser(AppendableCharSequence charSeq, int maxLineLength) {
269             this.charSeq = checkNotNull(charSeq, "charSeq");
270             this.maxLineLength = maxLineLength;
271         }
272 
273         AppendableCharSequence parse(ByteBuf byteBuf) {
274             reset();
275             int offset = byteBuf.forEachByte(this);
276             if (offset == -1) {
277                 return null;
278             }
279 
280             byteBuf.readerIndex(offset + 1);
281             return charSeq;
282         }
283 
284         AppendableCharSequence charSequence() {
285             return charSeq;
286         }
287 
288         @Override
289         public boolean process(byte nextByte) throws Exception {
290             if (nextByte == StompConstants.CR) {
291                 ++lineLength;
292                 return true;
293             }
294 
295             if (nextByte == StompConstants.LF) {
296                 return false;
297             }
298 
299             if (++lineLength > maxLineLength) {
300                 throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
301             }
302 
303             // 1 byte   -   0xxxxxxx                    -  7 bits
304             // 2 byte   -   110xxxxx 10xxxxxx           -  11 bits
305             // 3 byte   -   1110xxxx 10xxxxxx 10xxxxxx  -  16 bits
306             if (nextRead) {
307                 interim |= (nextByte & 0x3F) << 6;
308                 nextRead = false;
309             } else if (interim != 0) { // flush 2 or 3 byte
310                 appendTo(charSeq, (char) (interim | (nextByte & 0x3F)));
311                 interim = 0;
312             } else if (nextByte >= 0) { // INITIAL BRANCH
313                 // The first 128 characters (US-ASCII) need one byte.
314                 appendTo(charSeq, (char) nextByte);
315             } else if ((nextByte & 0xE0) == 0xC0) {
316                 // The next 1920 characters need two bytes and we can define
317                 // a first byte by mask 110xxxxx.
318                 interim = (char) ((nextByte & 0x1F) << 6);
319             } else {
320                 // The rest of characters need three bytes.
321                 interim = (char) ((nextByte & 0x0F) << 12);
322                 nextRead = true;
323             }
324 
325             return true;
326         }
327 
328         protected void appendTo(AppendableCharSequence charSeq, char chr) {
329             charSeq.append(chr);
330         }
331 
332         protected void reset() {
333             charSeq.reset();
334             lineLength = 0;
335             interim = 0;
336             nextRead = false;
337         }
338     }
339 
340     private static final class HeaderParser extends Utf8LineParser {
341 
342         private final boolean validateHeaders;
343 
344         private String name;
345         private boolean valid;
346 
347         private boolean shouldUnescape;
348         private boolean unescapeInProgress;
349 
350         HeaderParser(AppendableCharSequence charSeq, int maxLineLength, boolean validateHeaders) {
351             super(charSeq, maxLineLength);
352             this.validateHeaders = validateHeaders;
353         }
354 
355         boolean parseHeader(StompHeadersSubframe headersSubframe, ByteBuf buf) {
356             shouldUnescape = shouldUnescape(headersSubframe.command());
357             AppendableCharSequence value = super.parse(buf);
358             if (value == null || (name == null && value.length() == 0)) {
359                 return false;
360             }
361 
362             if (valid) {
363                 headersSubframe.headers().add(name, value.toString());
364             } else if (validateHeaders) {
365                 if (StringUtil.isNullOrEmpty(name)) {
366                     throw new IllegalArgumentException("received an invalid header line '" + value + '\'');
367                 }
368                 String line = name + ':' + value;
369                 throw new IllegalArgumentException("a header value or name contains a prohibited character ':'"
370                                                    + ", " + line);
371             }
372             return true;
373         }
374 
375         @Override
376         public boolean process(byte nextByte) throws Exception {
377             if (nextByte == StompConstants.COLON) {
378                 if (name == null) {
379                     AppendableCharSequence charSeq = charSequence();
380                     if (charSeq.length() != 0) {
381                         name = charSeq.substring(0, charSeq.length());
382                         charSeq.reset();
383                         valid = true;
384                         return true;
385                     } else {
386                         name = StringUtil.EMPTY_STRING;
387                     }
388                 } else {
389                     valid = false;
390                 }
391             }
392 
393             return super.process(nextByte);
394         }
395 
396         @Override
397         protected void appendTo(AppendableCharSequence charSeq, char chr) {
398             if (!shouldUnescape) {
399                 super.appendTo(charSeq, chr);
400                 return;
401             }
402 
403             if (chr == '\\') {
404                 if (unescapeInProgress) {
405                     super.appendTo(charSeq, chr);
406                     unescapeInProgress = false;
407                 } else {
408                     unescapeInProgress = true;
409                 }
410                 return;
411             }
412 
413             if (unescapeInProgress) {
414                 if (chr == 'c') {
415                     charSeq.append(':');
416                 } else if (chr == 'r') {
417                     charSeq.append('\r');
418                 } else if (chr == 'n') {
419                     charSeq.append('\n');
420                 } else {
421                     charSeq.append('\\').append(chr);
422                     throw new IllegalArgumentException("received an invalid escape header sequence '" + charSeq + '\'');
423                 }
424 
425                 unescapeInProgress = false;
426                 return;
427             }
428 
429             super.appendTo(charSeq, chr);
430         }
431 
432         @Override
433         protected void reset() {
434             name = null;
435             valid = false;
436             unescapeInProgress = false;
437             super.reset();
438         }
439 
440         private static boolean shouldUnescape(StompCommand command) {
441             return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
442         }
443     }
444 }