1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.stomp;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.DecoderResult;
23 import io.netty.handler.codec.ReplayingDecoder;
24 import io.netty.handler.codec.TooLongFrameException;
25 import io.netty.handler.codec.stomp.StompSubframeDecoder.State;
26 import io.netty.util.ByteProcessor;
27 import io.netty.util.internal.AppendableCharSequence;
28 import io.netty.util.internal.StringUtil;
29
30 import java.util.List;
31
32 import static io.netty.buffer.ByteBufUtil.*;
33 import static io.netty.util.internal.ObjectUtil.*;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public class StompSubframeDecoder extends ReplayingDecoder<State> {
54
55 private static final int DEFAULT_CHUNK_SIZE = 8132;
56 private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
57
58
59
60
61 @Deprecated
62 public enum State {
63 SKIP_CONTROL_CHARACTERS,
64 READ_HEADERS,
65 READ_CONTENT,
66 FINALIZE_FRAME_READ,
67 BAD_FRAME,
68 INVALID_CHUNK
69 }
70
71 private final Utf8LineParser commandParser;
72 private final HeaderParser headerParser;
73 private final int maxChunkSize;
74 private int alreadyReadChunkSize;
75 private LastStompContentSubframe lastContent;
76 private long contentLength = -1;
77
78 public StompSubframeDecoder() {
79 this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
80 }
81
82 public StompSubframeDecoder(boolean validateHeaders) {
83 this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE, validateHeaders);
84 }
85
86 public StompSubframeDecoder(int maxLineLength, int maxChunkSize) {
87 this(maxLineLength, maxChunkSize, false);
88 }
89
90 public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
91 super(State.SKIP_CONTROL_CHARACTERS);
92 checkPositive(maxLineLength, "maxLineLength");
93 checkPositive(maxChunkSize, "maxChunkSize");
94 this.maxChunkSize = maxChunkSize;
95 commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength);
96 headerParser = new HeaderParser(new AppendableCharSequence(128), maxLineLength, validateHeaders);
97 }
98
99 @Override
100 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
101 switch (state()) {
102 case SKIP_CONTROL_CHARACTERS:
103 skipControlCharacters(in);
104 checkpoint(State.READ_HEADERS);
105
106 case READ_HEADERS:
107 StompCommand command = StompCommand.UNKNOWN;
108 StompHeadersSubframe frame = null;
109 try {
110 command = readCommand(in);
111 frame = new DefaultStompHeadersSubframe(command);
112 checkpoint(readHeaders(in, frame));
113 out.add(frame);
114 } catch (Exception e) {
115 if (frame == null) {
116 frame = new DefaultStompHeadersSubframe(command);
117 }
118 frame.setDecoderResult(DecoderResult.failure(e));
119 out.add(frame);
120 checkpoint(State.BAD_FRAME);
121 return;
122 }
123 break;
124 case BAD_FRAME:
125 in.skipBytes(actualReadableBytes());
126 return;
127 }
128 try {
129 switch (state()) {
130 case READ_CONTENT:
131 int toRead = in.readableBytes();
132 if (toRead == 0) {
133 return;
134 }
135 if (toRead > maxChunkSize) {
136 toRead = maxChunkSize;
137 }
138 if (contentLength >= 0) {
139 int remainingLength = (int) (contentLength - alreadyReadChunkSize);
140 if (toRead > remainingLength) {
141 toRead = remainingLength;
142 }
143 ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
144 if ((alreadyReadChunkSize += toRead) >= contentLength) {
145 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
146 checkpoint(State.FINALIZE_FRAME_READ);
147 } else {
148 out.add(new DefaultStompContentSubframe(chunkBuffer));
149 return;
150 }
151 } else {
152 int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL);
153 if (nulIndex == in.readerIndex()) {
154 checkpoint(State.FINALIZE_FRAME_READ);
155 } else {
156 if (nulIndex > 0) {
157 toRead = nulIndex - in.readerIndex();
158 } else {
159 toRead = in.writerIndex() - in.readerIndex();
160 }
161 ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
162 alreadyReadChunkSize += toRead;
163 if (nulIndex > 0) {
164 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
165 checkpoint(State.FINALIZE_FRAME_READ);
166 } else {
167 out.add(new DefaultStompContentSubframe(chunkBuffer));
168 return;
169 }
170 }
171 }
172
173 case FINALIZE_FRAME_READ:
174 skipNullCharacter(in);
175 if (lastContent == null) {
176 lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
177 }
178 out.add(lastContent);
179 resetDecoder();
180 }
181 } catch (Exception e) {
182 if (lastContent != null) {
183 lastContent.release();
184 lastContent = null;
185 }
186
187 StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER);
188 errorContent.setDecoderResult(DecoderResult.failure(e));
189 out.add(errorContent);
190 checkpoint(State.BAD_FRAME);
191 }
192 }
193
194 private StompCommand readCommand(ByteBuf in) {
195 CharSequence commandSequence = commandParser.parse(in);
196 if (commandSequence == null) {
197 throw new DecoderException("Failed to read command from channel");
198 }
199 String commandStr = commandSequence.toString();
200 try {
201 return StompCommand.valueOf(commandStr);
202 } catch (IllegalArgumentException iae) {
203 throw new DecoderException("Cannot to parse command " + commandStr);
204 }
205 }
206
207 private State readHeaders(ByteBuf buffer, StompHeadersSubframe headersSubframe) {
208 StompHeaders headers = headersSubframe.headers();
209 for (;;) {
210 boolean headerRead = headerParser.parseHeader(headersSubframe, buffer);
211 if (!headerRead) {
212 if (headers.contains(StompHeaders.CONTENT_LENGTH)) {
213 contentLength = getContentLength(headers);
214 if (contentLength == 0) {
215 return State.FINALIZE_FRAME_READ;
216 }
217 }
218 return State.READ_CONTENT;
219 }
220 }
221 }
222
223 private static long getContentLength(StompHeaders headers) {
224 long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, 0L);
225 if (contentLength < 0) {
226 throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative");
227 }
228 return contentLength;
229 }
230
231 private static void skipNullCharacter(ByteBuf buffer) {
232 byte b = buffer.readByte();
233 if (b != StompConstants.NUL) {
234 throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
235 }
236 }
237
238 private static void skipControlCharacters(ByteBuf buffer) {
239 byte b;
240 for (;;) {
241 b = buffer.readByte();
242 if (b != StompConstants.CR && b != StompConstants.LF) {
243 buffer.readerIndex(buffer.readerIndex() - 1);
244 break;
245 }
246 }
247 }
248
249 private void resetDecoder() {
250 checkpoint(State.SKIP_CONTROL_CHARACTERS);
251 contentLength = -1;
252 alreadyReadChunkSize = 0;
253 lastContent = null;
254 }
255
256 private static class Utf8LineParser implements ByteProcessor {
257
258 private final AppendableCharSequence charSeq;
259 private final int maxLineLength;
260
261 private int lineLength;
262 private char interim;
263 private boolean nextRead;
264
265 Utf8LineParser(AppendableCharSequence charSeq, int maxLineLength) {
266 this.charSeq = checkNotNull(charSeq, "charSeq");
267 this.maxLineLength = maxLineLength;
268 }
269
270 AppendableCharSequence parse(ByteBuf byteBuf) {
271 reset();
272 int offset = byteBuf.forEachByte(this);
273 if (offset == -1) {
274 return null;
275 }
276
277 byteBuf.readerIndex(offset + 1);
278 return charSeq;
279 }
280
281 AppendableCharSequence charSequence() {
282 return charSeq;
283 }
284
285 @Override
286 public boolean process(byte nextByte) throws Exception {
287 if (nextByte == StompConstants.CR) {
288 ++lineLength;
289 return true;
290 }
291
292 if (nextByte == StompConstants.LF) {
293 return false;
294 }
295
296 if (++lineLength > maxLineLength) {
297 throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
298 }
299
300
301
302
303 if (nextRead) {
304 interim |= (nextByte & 0x3F) << 6;
305 nextRead = false;
306 } else if (interim != 0) {
307 appendTo(charSeq, (char) (interim | (nextByte & 0x3F)));
308 interim = 0;
309 } else if (nextByte >= 0) {
310
311 appendTo(charSeq, (char) nextByte);
312 } else if ((nextByte & 0xE0) == 0xC0) {
313
314
315 interim = (char) ((nextByte & 0x1F) << 6);
316 } else {
317
318 interim = (char) ((nextByte & 0x0F) << 12);
319 nextRead = true;
320 }
321
322 return true;
323 }
324
325 protected void appendTo(AppendableCharSequence charSeq, char chr) {
326 charSeq.append(chr);
327 }
328
329 protected void reset() {
330 charSeq.reset();
331 lineLength = 0;
332 interim = 0;
333 nextRead = false;
334 }
335 }
336
337 private static final class HeaderParser extends Utf8LineParser {
338
339 private final boolean validateHeaders;
340
341 private String name;
342 private boolean valid;
343
344 private boolean shouldUnescape;
345 private boolean unescapeInProgress;
346
347 HeaderParser(AppendableCharSequence charSeq, int maxLineLength, boolean validateHeaders) {
348 super(charSeq, maxLineLength);
349 this.validateHeaders = validateHeaders;
350 }
351
352 boolean parseHeader(StompHeadersSubframe headersSubframe, ByteBuf buf) {
353 shouldUnescape = shouldUnescape(headersSubframe.command());
354 AppendableCharSequence value = super.parse(buf);
355 if (value == null || (name == null && value.length() == 0)) {
356 return false;
357 }
358
359 if (valid) {
360 headersSubframe.headers().add(name, value.toString());
361 } else if (validateHeaders) {
362 if (StringUtil.isNullOrEmpty(name)) {
363 throw new IllegalArgumentException("received an invalid header line '" + value + '\'');
364 }
365 String line = name + ':' + value;
366 throw new IllegalArgumentException("a header value or name contains a prohibited character ':'"
367 + ", " + line);
368 }
369 return true;
370 }
371
372 @Override
373 public boolean process(byte nextByte) throws Exception {
374 if (nextByte == StompConstants.COLON) {
375 if (name == null) {
376 AppendableCharSequence charSeq = charSequence();
377 if (charSeq.length() != 0) {
378 name = charSeq.substring(0, charSeq.length());
379 charSeq.reset();
380 valid = true;
381 return true;
382 } else {
383 name = StringUtil.EMPTY_STRING;
384 }
385 } else {
386 valid = false;
387 }
388 }
389
390 return super.process(nextByte);
391 }
392
393 @Override
394 protected void appendTo(AppendableCharSequence charSeq, char chr) {
395 if (!shouldUnescape) {
396 super.appendTo(charSeq, chr);
397 return;
398 }
399
400 if (chr == '\\') {
401 if (unescapeInProgress) {
402 super.appendTo(charSeq, chr);
403 unescapeInProgress = false;
404 } else {
405 unescapeInProgress = true;
406 }
407 return;
408 }
409
410 if (unescapeInProgress) {
411 if (chr == 'c') {
412 charSeq.append(':');
413 } else if (chr == 'r') {
414 charSeq.append('\r');
415 } else if (chr == 'n') {
416 charSeq.append('\n');
417 } else {
418 charSeq.append('\\').append(chr);
419 throw new IllegalArgumentException("received an invalid escape header sequence '" + charSeq + '\'');
420 }
421
422 unescapeInProgress = false;
423 return;
424 }
425
426 super.appendTo(charSeq, chr);
427 }
428
429 @Override
430 protected void reset() {
431 name = null;
432 valid = false;
433 unescapeInProgress = false;
434 super.reset();
435 }
436
437 private static boolean shouldUnescape(StompCommand command) {
438 return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
439 }
440 }
441 }