1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.stomp;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.DecoderResult;
23 import io.netty.handler.codec.ReplayingDecoder;
24 import io.netty.handler.codec.TooLongFrameException;
25 import io.netty.handler.codec.stomp.StompSubframeDecoder.State;
26 import io.netty.util.ByteProcessor;
27 import io.netty.util.internal.AppendableCharSequence;
28 import io.netty.util.internal.StringUtil;
29
30 import java.util.List;
31
32 import static io.netty.buffer.ByteBufUtil.*;
33 import static io.netty.util.internal.ObjectUtil.*;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public class StompSubframeDecoder extends ReplayingDecoder<State> {
54
55 private static final int DEFAULT_CHUNK_SIZE = 8132;
56 private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
57
58
59
60
61 @Deprecated
62 public enum State {
63 SKIP_CONTROL_CHARACTERS,
64 READ_HEADERS,
65 READ_CONTENT,
66 FINALIZE_FRAME_READ,
67 BAD_FRAME,
68 INVALID_CHUNK
69 }
70
71 private final Utf8LineParser commandParser;
72 private final HeaderParser headerParser;
73 private final int maxChunkSize;
74 private int alreadyReadChunkSize;
75 private LastStompContentSubframe lastContent;
76 private long contentLength = -1;
77
78 public StompSubframeDecoder() {
79 this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
80 }
81
82 public StompSubframeDecoder(boolean validateHeaders) {
83 this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE, validateHeaders);
84 }
85
86 public StompSubframeDecoder(int maxLineLength, int maxChunkSize) {
87 this(maxLineLength, maxChunkSize, false);
88 }
89
90 public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
91 super(State.SKIP_CONTROL_CHARACTERS);
92 checkPositive(maxLineLength, "maxLineLength");
93 checkPositive(maxChunkSize, "maxChunkSize");
94 this.maxChunkSize = maxChunkSize;
95 commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength);
96 headerParser = new HeaderParser(new AppendableCharSequence(128), maxLineLength, validateHeaders);
97 }
98
99 @Override
100 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
101 switch (state()) {
102 case SKIP_CONTROL_CHARACTERS:
103 skipControlCharacters(in);
104 checkpoint(State.READ_HEADERS);
105
106 case READ_HEADERS:
107 StompCommand command = StompCommand.UNKNOWN;
108 StompHeadersSubframe frame = null;
109 try {
110 command = readCommand(in);
111 frame = new DefaultStompHeadersSubframe(command);
112 checkpoint(readHeaders(in, frame));
113 out.add(frame);
114 } catch (Exception e) {
115 if (frame == null) {
116 frame = new DefaultStompHeadersSubframe(command);
117 }
118 frame.setDecoderResult(DecoderResult.failure(e));
119 out.add(frame);
120 checkpoint(State.BAD_FRAME);
121 return;
122 }
123 break;
124 case BAD_FRAME:
125 in.skipBytes(actualReadableBytes());
126 return;
127 }
128 try {
129 switch (state()) {
130 case READ_CONTENT:
131 int toRead = in.readableBytes();
132 if (toRead == 0) {
133 return;
134 }
135 if (toRead > maxChunkSize) {
136 toRead = maxChunkSize;
137 }
138 if (contentLength >= 0) {
139 int remainingLength = (int) (contentLength - alreadyReadChunkSize);
140 if (toRead > remainingLength) {
141 toRead = remainingLength;
142 }
143 ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
144 if ((alreadyReadChunkSize += toRead) >= contentLength) {
145 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
146 checkpoint(State.FINALIZE_FRAME_READ);
147 } else {
148 out.add(new DefaultStompContentSubframe(chunkBuffer));
149 return;
150 }
151 } else {
152 int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL);
153 if (nulIndex == in.readerIndex()) {
154 checkpoint(State.FINALIZE_FRAME_READ);
155 } else {
156 if (nulIndex > 0) {
157 toRead = nulIndex - in.readerIndex();
158 } else {
159 toRead = in.writerIndex() - in.readerIndex();
160 }
161 ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
162 alreadyReadChunkSize += toRead;
163 if (nulIndex > 0) {
164 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
165 checkpoint(State.FINALIZE_FRAME_READ);
166 } else {
167 out.add(new DefaultStompContentSubframe(chunkBuffer));
168 return;
169 }
170 }
171 }
172
173 case FINALIZE_FRAME_READ:
174 skipNullCharacter(in);
175 if (lastContent == null) {
176 lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
177 }
178 out.add(lastContent);
179 resetDecoder();
180 }
181 } catch (Exception e) {
182 if (lastContent != null) {
183 lastContent.release();
184 lastContent = null;
185 }
186
187 StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER);
188 errorContent.setDecoderResult(DecoderResult.failure(e));
189 out.add(errorContent);
190 checkpoint(State.BAD_FRAME);
191 }
192 }
193
194 private StompCommand readCommand(ByteBuf in) {
195 CharSequence commandSequence = commandParser.parse(in);
196 if (commandSequence == null) {
197 throw new DecoderException("Failed to read command from channel");
198 }
199 String commandStr = commandSequence.toString();
200 try {
201 return StompCommand.valueOf(commandStr);
202 } catch (IllegalArgumentException iae) {
203 throw new DecoderException("Cannot to parse command " + commandStr);
204 }
205 }
206
207 private State readHeaders(ByteBuf buffer, StompHeadersSubframe headersSubframe) {
208 StompHeaders headers = headersSubframe.headers();
209 for (;;) {
210 boolean headerRead = headerParser.parseHeader(headersSubframe, buffer);
211 if (!headerRead) {
212 if (headers.contains(StompHeaders.CONTENT_LENGTH)) {
213 contentLength = getContentLength(headers);
214 if (contentLength == 0) {
215 return State.FINALIZE_FRAME_READ;
216 }
217 }
218 return State.READ_CONTENT;
219 }
220 }
221 }
222
223 private static long getContentLength(StompHeaders headers) {
224 long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, 0L);
225 if (contentLength < 0) {
226 throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative");
227 }
228 return contentLength;
229 }
230
231 private static void skipNullCharacter(ByteBuf buffer) {
232 byte b = buffer.readByte();
233 if (b != StompConstants.NUL) {
234 throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
235 }
236 }
237
238 private static void skipControlCharacters(ByteBuf buffer) {
239 byte b;
240 for (;;) {
241 if (!buffer.isReadable()) {
242 return;
243 }
244 b = buffer.readByte();
245 if (b != StompConstants.CR && b != StompConstants.LF) {
246 buffer.readerIndex(buffer.readerIndex() - 1);
247 break;
248 }
249 }
250 }
251
252 private void resetDecoder() {
253 checkpoint(State.SKIP_CONTROL_CHARACTERS);
254 contentLength = -1;
255 alreadyReadChunkSize = 0;
256 lastContent = null;
257 }
258
259 private static class Utf8LineParser implements ByteProcessor {
260
261 private final AppendableCharSequence charSeq;
262 private final int maxLineLength;
263
264 private int lineLength;
265 private char interim;
266 private boolean nextRead;
267
268 Utf8LineParser(AppendableCharSequence charSeq, int maxLineLength) {
269 this.charSeq = checkNotNull(charSeq, "charSeq");
270 this.maxLineLength = maxLineLength;
271 }
272
273 AppendableCharSequence parse(ByteBuf byteBuf) {
274 reset();
275 int offset = byteBuf.forEachByte(this);
276 if (offset == -1) {
277 return null;
278 }
279
280 byteBuf.readerIndex(offset + 1);
281 return charSeq;
282 }
283
284 AppendableCharSequence charSequence() {
285 return charSeq;
286 }
287
288 @Override
289 public boolean process(byte nextByte) throws Exception {
290 if (nextByte == StompConstants.CR) {
291 ++lineLength;
292 return true;
293 }
294
295 if (nextByte == StompConstants.LF) {
296 return false;
297 }
298
299 if (++lineLength > maxLineLength) {
300 throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
301 }
302
303
304
305
306 if (nextRead) {
307 interim |= (nextByte & 0x3F) << 6;
308 nextRead = false;
309 } else if (interim != 0) {
310 appendTo(charSeq, (char) (interim | (nextByte & 0x3F)));
311 interim = 0;
312 } else if (nextByte >= 0) {
313
314 appendTo(charSeq, (char) nextByte);
315 } else if ((nextByte & 0xE0) == 0xC0) {
316
317
318 interim = (char) ((nextByte & 0x1F) << 6);
319 } else {
320
321 interim = (char) ((nextByte & 0x0F) << 12);
322 nextRead = true;
323 }
324
325 return true;
326 }
327
328 protected void appendTo(AppendableCharSequence charSeq, char chr) {
329 charSeq.append(chr);
330 }
331
332 protected void reset() {
333 charSeq.reset();
334 lineLength = 0;
335 interim = 0;
336 nextRead = false;
337 }
338 }
339
340 private static final class HeaderParser extends Utf8LineParser {
341
342 private final boolean validateHeaders;
343
344 private String name;
345 private boolean valid;
346
347 private boolean shouldUnescape;
348 private boolean unescapeInProgress;
349
350 HeaderParser(AppendableCharSequence charSeq, int maxLineLength, boolean validateHeaders) {
351 super(charSeq, maxLineLength);
352 this.validateHeaders = validateHeaders;
353 }
354
355 boolean parseHeader(StompHeadersSubframe headersSubframe, ByteBuf buf) {
356 shouldUnescape = shouldUnescape(headersSubframe.command());
357 AppendableCharSequence value = super.parse(buf);
358 if (value == null || (name == null && value.length() == 0)) {
359 return false;
360 }
361
362 if (valid) {
363 headersSubframe.headers().add(name, value.toString());
364 } else if (validateHeaders) {
365 if (StringUtil.isNullOrEmpty(name)) {
366 throw new IllegalArgumentException("received an invalid header line '" + value + '\'');
367 }
368 String line = name + ':' + value;
369 throw new IllegalArgumentException("a header value or name contains a prohibited character ':'"
370 + ", " + line);
371 }
372 return true;
373 }
374
375 @Override
376 public boolean process(byte nextByte) throws Exception {
377 if (nextByte == StompConstants.COLON) {
378 if (name == null) {
379 AppendableCharSequence charSeq = charSequence();
380 if (charSeq.length() != 0) {
381 name = charSeq.substring(0, charSeq.length());
382 charSeq.reset();
383 valid = true;
384 return true;
385 } else {
386 name = StringUtil.EMPTY_STRING;
387 }
388 } else {
389 valid = false;
390 }
391 }
392
393 return super.process(nextByte);
394 }
395
396 @Override
397 protected void appendTo(AppendableCharSequence charSeq, char chr) {
398 if (!shouldUnescape) {
399 super.appendTo(charSeq, chr);
400 return;
401 }
402
403 if (chr == '\\') {
404 if (unescapeInProgress) {
405 super.appendTo(charSeq, chr);
406 unescapeInProgress = false;
407 } else {
408 unescapeInProgress = true;
409 }
410 return;
411 }
412
413 if (unescapeInProgress) {
414 if (chr == 'c') {
415 charSeq.append(':');
416 } else if (chr == 'r') {
417 charSeq.append('\r');
418 } else if (chr == 'n') {
419 charSeq.append('\n');
420 } else {
421 charSeq.append('\\').append(chr);
422 throw new IllegalArgumentException("received an invalid escape header sequence '" + charSeq + '\'');
423 }
424
425 unescapeInProgress = false;
426 return;
427 }
428
429 super.appendTo(charSeq, chr);
430 }
431
432 @Override
433 protected void reset() {
434 name = null;
435 valid = false;
436 unescapeInProgress = false;
437 super.reset();
438 }
439
440 private static boolean shouldUnescape(StompCommand command) {
441 return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
442 }
443 }
444 }