View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.stomp;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.DecoderResult;
23  import io.netty.handler.codec.ReplayingDecoder;
24  import io.netty.handler.codec.TooLongFrameException;
25  import io.netty.handler.codec.stomp.StompSubframeDecoder.State;
26  import io.netty.util.internal.AppendableCharSequence;
27  import io.netty.util.internal.StringUtil;
28  
29  import java.util.List;
30  import java.util.Locale;
31  
32  import static io.netty.buffer.ByteBufUtil.*;
33  
34  /**
35   * Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and
36   * {@link StompContentSubframe}s.
37   *
38   * <h3>Parameters to control memory consumption: </h3>
39   * {@code maxLineLength} the maximum length of line -
40   * restricts length of command and header lines
41   * If the length of the initial line exceeds this value, a
42   * {@link TooLongFrameException} will be raised.
43   * <br>
44   * {@code maxChunkSize}
45   * The maximum length of the content or each chunk.  If the content length
46   * (or the length of each chunk) exceeds this value, the content or chunk
47   * ill be split into multiple {@link StompContentSubframe}s whose length is
48   * {@code maxChunkSize} at maximum.
49   *
50   * <h3>Chunked Content</h3>
51   *
52   * If the content of a stomp message is greater than {@code maxChunkSize}
53   * the transfer encoding of the HTTP message is 'chunked', this decoder
54   * generates multiple {@link StompContentSubframe} instances to avoid excessive memory
55   * consumption. Note, that every message, even with no content decodes with
56   * {@link LastStompContentSubframe} at the end to simplify upstream message parsing.
57   */
58  public class StompSubframeDecoder extends ReplayingDecoder<State> {
59  
60      private static final int DEFAULT_CHUNK_SIZE = 8132;
61      private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
62  
63      enum State {
64          SKIP_CONTROL_CHARACTERS,
65          READ_HEADERS,
66          READ_CONTENT,
67          FINALIZE_FRAME_READ,
68          BAD_FRAME,
69          INVALID_CHUNK
70      }
71  
72      private final int maxLineLength;
73      private final int maxChunkSize;
74      private int alreadyReadChunkSize;
75      private LastStompContentSubframe lastContent;
76      private long contentLength;
77  
78      public StompSubframeDecoder() {
79          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
80      }
81  
82      public StompSubframeDecoder(int maxLineLength, int maxChunkSize) {
83          super(State.SKIP_CONTROL_CHARACTERS);
84          if (maxLineLength <= 0) {
85              throw new IllegalArgumentException(
86                      "maxLineLength must be a positive integer: " +
87                              maxLineLength);
88          }
89          if (maxChunkSize <= 0) {
90              throw new IllegalArgumentException(
91                      "maxChunkSize must be a positive integer: " +
92                              maxChunkSize);
93          }
94          this.maxChunkSize = maxChunkSize;
95          this.maxLineLength = maxLineLength;
96      }
97  
98      @Override
99      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
100         switch (state()) {
101             case SKIP_CONTROL_CHARACTERS:
102                 skipControlCharacters(in);
103                 checkpoint(State.READ_HEADERS);
104                 // Fall through.
105             case READ_HEADERS:
106                 StompCommand command = StompCommand.UNKNOWN;
107                 StompHeadersSubframe frame = null;
108                 try {
109                     command = readCommand(in);
110                     frame = new DefaultStompHeadersSubframe(command);
111                     checkpoint(readHeaders(in, frame.headers()));
112                     out.add(frame);
113                 } catch (Exception e) {
114                     if (frame == null) {
115                         frame = new DefaultStompHeadersSubframe(command);
116                     }
117                     frame.setDecoderResult(DecoderResult.failure(e));
118                     out.add(frame);
119                     checkpoint(State.BAD_FRAME);
120                     return;
121                 }
122                 break;
123             case BAD_FRAME:
124                 in.skipBytes(actualReadableBytes());
125                 return;
126         }
127         try {
128             switch (state()) {
129                 case READ_CONTENT:
130                     int toRead = in.readableBytes();
131                     if (toRead == 0) {
132                         return;
133                     }
134                     if (toRead > maxChunkSize) {
135                         toRead = maxChunkSize;
136                     }
137                     int remainingLength = (int) (contentLength - alreadyReadChunkSize);
138                     if (toRead > remainingLength) {
139                         toRead = remainingLength;
140                     }
141                     ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
142                     if ((alreadyReadChunkSize += toRead) >= contentLength) {
143                         lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
144                         checkpoint(State.FINALIZE_FRAME_READ);
145                     } else {
146                         DefaultStompContentSubframe chunk;
147                         chunk = new DefaultStompContentSubframe(chunkBuffer);
148                         out.add(chunk);
149                     }
150                     if (alreadyReadChunkSize < contentLength) {
151                         return;
152                     }
153                     // Fall through.
154                 case FINALIZE_FRAME_READ:
155                     skipNullCharacter(in);
156                     if (lastContent == null) {
157                         lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
158                     }
159                     out.add(lastContent);
160                     resetDecoder();
161             }
162         } catch (Exception e) {
163             StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER);
164             errorContent.setDecoderResult(DecoderResult.failure(e));
165             out.add(errorContent);
166             checkpoint(State.BAD_FRAME);
167         }
168     }
169 
170     private StompCommand readCommand(ByteBuf in) {
171         String commandStr = readLine(in, maxLineLength);
172         StompCommand command = null;
173         try {
174             command = StompCommand.valueOf(commandStr);
175         } catch (IllegalArgumentException iae) {
176             //do nothing
177         }
178         if (command == null) {
179             commandStr = commandStr.toUpperCase(Locale.US);
180             try {
181                 command = StompCommand.valueOf(commandStr);
182             } catch (IllegalArgumentException iae) {
183                 //do nothing
184             }
185         }
186         if (command == null) {
187             throw new DecoderException("failed to read command from channel");
188         }
189         return command;
190     }
191 
192     private State readHeaders(ByteBuf buffer, StompHeaders headers) {
193         for (;;) {
194             String line = readLine(buffer, maxLineLength);
195             if (!line.isEmpty()) {
196                 String[] split = StringUtil.split(line, ':');
197                 if (split.length == 2) {
198                     headers.add(split[0], split[1]);
199                 }
200             } else {
201                 long contentLength = -1;
202                 if (headers.contains(StompHeaders.CONTENT_LENGTH))  {
203                     contentLength = getContentLength(headers, 0);
204                 } else {
205                     int globalIndex = indexOf(buffer, buffer.readerIndex(),
206                             buffer.writerIndex(), StompConstants.NUL);
207                     if (globalIndex != -1) {
208                         contentLength = globalIndex - buffer.readerIndex();
209                     }
210                 }
211                 if (contentLength > 0) {
212                     this.contentLength = contentLength;
213                     return State.READ_CONTENT;
214                 } else {
215                     return State.FINALIZE_FRAME_READ;
216                 }
217             }
218         }
219     }
220 
221     private static long getContentLength(StompHeaders headers, long defaultValue) {
222         return headers.getLong(StompHeaders.CONTENT_LENGTH, defaultValue);
223     }
224 
225     private static void skipNullCharacter(ByteBuf buffer) {
226         byte b = buffer.readByte();
227         if (b != StompConstants.NUL) {
228             throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
229         }
230     }
231 
232     private static void skipControlCharacters(ByteBuf buffer) {
233         byte b;
234         for (;;) {
235             b = buffer.readByte();
236             if (b != StompConstants.CR && b != StompConstants.LF) {
237                 buffer.readerIndex(buffer.readerIndex() - 1);
238                 break;
239             }
240         }
241     }
242 
243     private static String readLine(ByteBuf buffer, int maxLineLength) {
244         AppendableCharSequence buf = new AppendableCharSequence(128);
245         int lineLength = 0;
246         for (;;) {
247             byte nextByte = buffer.readByte();
248             if (nextByte == StompConstants.CR) {
249                 nextByte = buffer.readByte();
250                 if (nextByte == StompConstants.LF) {
251                     return buf.toString();
252                 }
253             } else if (nextByte == StompConstants.LF) {
254                 return buf.toString();
255             } else {
256                 if (lineLength >= maxLineLength) {
257                     throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
258                 }
259                 lineLength ++;
260                 buf.append((char) nextByte);
261             }
262         }
263     }
264 
265     private void resetDecoder() {
266         checkpoint(State.SKIP_CONTROL_CHARACTERS);
267         contentLength = 0;
268         alreadyReadChunkSize = 0;
269         lastContent = null;
270     }
271 }