View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.stomp;
17  
18  import java.util.List;
19  import java.util.Locale;
20  
21  import io.netty.buffer.ByteBuf;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.handler.codec.DecoderException;
25  import io.netty.handler.codec.DecoderResult;
26  import io.netty.handler.codec.ReplayingDecoder;
27  import io.netty.handler.codec.TooLongFrameException;
28  import io.netty.handler.codec.stomp.StompSubframeDecoder.State;
29  import io.netty.util.internal.AppendableCharSequence;
30  
31  import static io.netty.buffer.ByteBufUtil.indexOf;
32  import static io.netty.buffer.ByteBufUtil.readBytes;
33  
34  /**
35   * Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and
36   * {@link StompContentSubframe}s.
37   *
38   * <h3>Parameters to control memory consumption: </h3>
39   * {@code maxLineLength} the maximum length of line -
40   * restricts length of command and header lines
41   * If the length of the initial line exceeds this value, a
42   * {@link TooLongFrameException} will be raised.
43   * <br>
44   * {@code maxChunkSize}
45   * The maximum length of the content or each chunk.  If the content length
46   * (or the length of each chunk) exceeds this value, the content or chunk
47   * ill be split into multiple {@link StompContentSubframe}s whose length is
48   * {@code maxChunkSize} at maximum.
49   *
50   * <h3>Chunked Content</h3>
51   *
52   * If the content of a stomp message is greater than {@code maxChunkSize}
53   * the transfer encoding of the HTTP message is 'chunked', this decoder
54   * generates multiple {@link StompContentSubframe} instances to avoid excessive memory
55   * consumption. Note, that every message, even with no content decodes with
56   * {@link LastStompContentSubframe} at the end to simplify upstream message parsing.
57   */
58  public class StompSubframeDecoder extends ReplayingDecoder<State> {
59  
60      private static final int DEFAULT_CHUNK_SIZE = 8132;
61      private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
62  
63      enum State {
64          SKIP_CONTROL_CHARACTERS,
65          READ_HEADERS,
66          READ_CONTENT,
67          FINALIZE_FRAME_READ,
68          BAD_FRAME,
69          INVALID_CHUNK
70      }
71  
72      private final int maxLineLength;
73      private final int maxChunkSize;
74      private final boolean validateHeaders;
75      private int alreadyReadChunkSize;
76      private LastStompContentSubframe lastContent;
77      private long contentLength = -1;
78  
79      public StompSubframeDecoder() {
80          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
81      }
82  
83      public StompSubframeDecoder(boolean validateHeaders) {
84          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE, validateHeaders);
85      }
86  
87      public StompSubframeDecoder(int maxLineLength, int maxChunkSize) {
88          this(maxLineLength, maxChunkSize, false);
89      }
90  
91      public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
92          super(State.SKIP_CONTROL_CHARACTERS);
93          if (maxLineLength <= 0) {
94              throw new IllegalArgumentException(
95                      "maxLineLength must be a positive integer: " +
96                              maxLineLength);
97          }
98          if (maxChunkSize <= 0) {
99              throw new IllegalArgumentException(
100                     "maxChunkSize must be a positive integer: " +
101                             maxChunkSize);
102         }
103         this.maxChunkSize = maxChunkSize;
104         this.maxLineLength = maxLineLength;
105         this.validateHeaders = validateHeaders;
106     }
107 
108     @Override
109     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
110         switch (state()) {
111             case SKIP_CONTROL_CHARACTERS:
112                 skipControlCharacters(in);
113                 checkpoint(State.READ_HEADERS);
114                 // Fall through.
115             case READ_HEADERS:
116                 StompCommand command = StompCommand.UNKNOWN;
117                 StompHeadersSubframe frame = null;
118                 try {
119                     command = readCommand(in);
120                     frame = new DefaultStompHeadersSubframe(command);
121                     checkpoint(readHeaders(in, frame.headers()));
122                     out.add(frame);
123                 } catch (Exception e) {
124                     if (frame == null) {
125                         frame = new DefaultStompHeadersSubframe(command);
126                     }
127                     frame.setDecoderResult(DecoderResult.failure(e));
128                     out.add(frame);
129                     checkpoint(State.BAD_FRAME);
130                     return;
131                 }
132                 break;
133             case BAD_FRAME:
134                 in.skipBytes(actualReadableBytes());
135                 return;
136         }
137         try {
138             switch (state()) {
139                 case READ_CONTENT:
140                     int toRead = in.readableBytes();
141                     if (toRead == 0) {
142                         return;
143                     }
144                     if (toRead > maxChunkSize) {
145                         toRead = maxChunkSize;
146                     }
147                     if (contentLength >= 0) {
148                         int remainingLength = (int) (contentLength - alreadyReadChunkSize);
149                         if (toRead > remainingLength) {
150                             toRead = remainingLength;
151                         }
152                         ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
153                         if ((alreadyReadChunkSize += toRead) >= contentLength) {
154                             lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
155                             checkpoint(State.FINALIZE_FRAME_READ);
156                         } else {
157                             out.add(new DefaultStompContentSubframe(chunkBuffer));
158                             return;
159                         }
160                     } else {
161                         int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL);
162                         if (nulIndex == in.readerIndex()) {
163                             checkpoint(State.FINALIZE_FRAME_READ);
164                         } else {
165                             if (nulIndex > 0) {
166                                 toRead = nulIndex - in.readerIndex();
167                             } else {
168                                 toRead = in.writerIndex() - in.readerIndex();
169                             }
170                             ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
171                             alreadyReadChunkSize += toRead;
172                             if (nulIndex > 0) {
173                                 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
174                                 checkpoint(State.FINALIZE_FRAME_READ);
175                             } else {
176                                 out.add(new DefaultStompContentSubframe(chunkBuffer));
177                                 return;
178                             }
179                         }
180                     }
181                     // Fall through.
182                 case FINALIZE_FRAME_READ:
183                     skipNullCharacter(in);
184                     if (lastContent == null) {
185                         lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
186                     }
187                     out.add(lastContent);
188                     resetDecoder();
189             }
190         } catch (Exception e) {
191             StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER);
192             errorContent.setDecoderResult(DecoderResult.failure(e));
193             out.add(errorContent);
194             checkpoint(State.BAD_FRAME);
195         }
196     }
197 
198     private StompCommand readCommand(ByteBuf in) {
199         String commandStr = readLine(in, maxLineLength);
200         StompCommand command = null;
201         try {
202             command = StompCommand.valueOf(commandStr);
203         } catch (IllegalArgumentException iae) {
204             //do nothing
205         }
206         if (command == null) {
207             commandStr = commandStr.toUpperCase(Locale.US);
208             try {
209                 command = StompCommand.valueOf(commandStr);
210             } catch (IllegalArgumentException iae) {
211                 //do nothing
212             }
213         }
214         if (command == null) {
215             throw new DecoderException("failed to read command from channel");
216         }
217         return command;
218     }
219 
220     private State readHeaders(ByteBuf buffer, StompHeaders headers) {
221         for (;;) {
222             String line = readLine(buffer, maxLineLength);
223             if (!line.isEmpty()) {
224                 String[] split = line.split(":");
225                 if (split.length == 2) {
226                     headers.add(split[0], split[1]);
227                 } else if (validateHeaders) {
228                     throw new IllegalArgumentException("a header value or name contains a prohibited character ':'" +
229                             ", " + line);
230                 }
231             } else {
232                 if (headers.contains(StompHeaders.CONTENT_LENGTH))  {
233                     contentLength = getContentLength(headers, 0);
234                     if (contentLength == 0) {
235                         return State.FINALIZE_FRAME_READ;
236                     }
237                 }
238                 return State.READ_CONTENT;
239             }
240         }
241     }
242 
243     private static long getContentLength(StompHeaders headers, long defaultValue) {
244         long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, defaultValue);
245         if (contentLength < 0) {
246             throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative");
247         }
248         return contentLength;
249     }
250 
251     private static void skipNullCharacter(ByteBuf buffer) {
252         byte b = buffer.readByte();
253         if (b != StompConstants.NUL) {
254             throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
255         }
256     }
257 
258     private static void skipControlCharacters(ByteBuf buffer) {
259         byte b;
260         for (;;) {
261             b = buffer.readByte();
262             if (b != StompConstants.CR && b != StompConstants.LF) {
263                 buffer.readerIndex(buffer.readerIndex() - 1);
264                 break;
265             }
266         }
267     }
268 
269     private static String readLine(ByteBuf buffer, int maxLineLength) {
270         AppendableCharSequence buf = new AppendableCharSequence(128);
271         int lineLength = 0;
272         for (;;) {
273             byte nextByte = buffer.readByte();
274             if (nextByte == StompConstants.CR) {
275                 nextByte = buffer.readByte();
276                 if (nextByte == StompConstants.LF) {
277                     return buf.toString();
278                 }
279             } else if (nextByte == StompConstants.LF) {
280                 return buf.toString();
281             } else {
282                 if (lineLength >= maxLineLength) {
283                     throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
284                 }
285                 lineLength ++;
286                 buf.append((char) nextByte);
287             }
288         }
289     }
290 
291     private void resetDecoder() {
292         checkpoint(State.SKIP_CONTROL_CHARACTERS);
293         contentLength = -1;
294         alreadyReadChunkSize = 0;
295         lastContent = null;
296     }
297 }