1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.ReplayingDecoder;
23 import io.netty.handler.codec.TooLongFrameException;
24 import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25 import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26 import io.netty.util.CharsetUtil;
27 import io.netty.util.Signal;
28 import io.netty.util.internal.ObjectUtil;
29
30 import java.util.ArrayList;
31 import java.util.List;
32
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
34 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
35 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
36 import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
37 import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
38 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
39 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
40 import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
41
42
43
44
45
46
47
48
49
50 public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
51
52
53
54
55
56
57 enum DecoderState {
58 READ_FIXED_HEADER,
59 READ_VARIABLE_HEADER,
60 READ_PAYLOAD,
61 BAD_MESSAGE,
62 }
63
64 private MqttFixedHeader mqttFixedHeader;
65 private Object variableHeader;
66 private int bytesRemainingInVariablePart;
67
68 private final int maxBytesInMessage;
69 private final int maxClientIdLength;
70
71 public MqttDecoder() {
72 this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
73 }
74
75 public MqttDecoder(int maxBytesInMessage) {
76 this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
77 }
78
79 public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
80 super(DecoderState.READ_FIXED_HEADER);
81 this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
82 this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
83 }
84
85 @Override
86 protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
87 switch (state()) {
88 case READ_FIXED_HEADER: try {
89 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
90 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
91 checkpoint(DecoderState.READ_VARIABLE_HEADER);
92
93 } catch (Exception cause) {
94 out.add(invalidMessage(cause));
95 return;
96 }
97
98 case READ_VARIABLE_HEADER: try {
99 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
100 boolean bailOut = false;
101 try {
102 variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
103 } catch (Signal signal) {
104 if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
105
106
107
108 bailOut = true;
109 } else {
110
111 throw signal;
112 }
113 }
114 if (bailOut || bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
115 buffer.skipBytes(actualReadableBytes());
116 throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
117 + bytesRemainingBeforeVariableHeader);
118 }
119 checkpoint(DecoderState.READ_PAYLOAD);
120
121 } catch (Exception cause) {
122 out.add(invalidMessage(cause));
123 return;
124 }
125
126 case READ_PAYLOAD: try {
127 final Object decodedPayload =
128 decodePayload(
129 ctx,
130 buffer,
131 mqttFixedHeader.messageType(),
132 maxClientIdLength,
133 variableHeader);
134 checkpoint(DecoderState.READ_FIXED_HEADER);
135 MqttMessage message = MqttMessageFactory.newMessage(
136 mqttFixedHeader, variableHeader, decodedPayload);
137 mqttFixedHeader = null;
138 variableHeader = null;
139 out.add(message);
140 break;
141 } catch (Exception cause) {
142 out.add(invalidMessage(cause));
143 return;
144 }
145
146 case BAD_MESSAGE:
147
148 buffer.skipBytes(actualReadableBytes());
149 break;
150
151 default:
152
153 throw new Error();
154 }
155 }
156
157 private MqttMessage invalidMessage(Throwable cause) {
158 checkpoint(DecoderState.BAD_MESSAGE);
159 return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
160 }
161
162
163
164
165
166
167
168
169
170
171
172 private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
173 short b1 = buffer.readUnsignedByte();
174
175 MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
176 boolean dupFlag = (b1 & 0x08) == 0x08;
177 int qosLevel = (b1 & 0x06) >> 1;
178 boolean retain = (b1 & 0x01) != 0;
179
180 switch (messageType) {
181 case PUBLISH:
182 if (qosLevel == 3) {
183 throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
184 + qosLevel + ')');
185 }
186 break;
187
188 case PUBREL:
189 case SUBSCRIBE:
190 case UNSUBSCRIBE:
191 if (dupFlag) {
192 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
193 + " message, must be 0, found 1");
194 }
195 if (qosLevel != 1) {
196 throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
197 + " message, must be 1, found " + qosLevel);
198 }
199 if (retain) {
200 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
201 + " message, must be 0, found 1");
202 }
203 break;
204
205 case AUTH:
206 case CONNACK:
207 case CONNECT:
208 case DISCONNECT:
209 case PINGREQ:
210 case PINGRESP:
211 case PUBACK:
212 case PUBCOMP:
213 case PUBREC:
214 case SUBACK:
215 case UNSUBACK:
216 if (dupFlag) {
217 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
218 + " message, must be 0, found 1");
219 }
220 if (qosLevel != 0) {
221 throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
222 + " message, must be 0, found " + qosLevel);
223 }
224 if (retain) {
225 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
226 + " message, must be 0, found 1");
227 }
228 break;
229 default:
230 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
231 }
232
233 int remainingLength = 0;
234 int multiplier = 1;
235 short digit;
236 int loops = 0;
237 do {
238 digit = buffer.readUnsignedByte();
239 remainingLength += (digit & 127) * multiplier;
240 multiplier *= 128;
241 loops++;
242 } while ((digit & 128) != 0 && loops < 4);
243
244
245 if (loops == 4 && (digit & 128) != 0) {
246 throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
247 }
248 MqttFixedHeader decodedFixedHeader =
249 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
250 return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
251 }
252
253
254
255
256
257
258
259 private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
260 switch (mqttFixedHeader.messageType()) {
261 case CONNECT:
262 return decodeConnectionVariableHeader(ctx, buffer);
263
264 case CONNACK:
265 return decodeConnAckVariableHeader(ctx, buffer);
266
267 case UNSUBSCRIBE:
268 case SUBSCRIBE:
269 case SUBACK:
270 case UNSUBACK:
271 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
272
273 case PUBACK:
274 case PUBREC:
275 case PUBCOMP:
276 case PUBREL:
277 return decodePubReplyMessage(buffer);
278
279 case PUBLISH:
280 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
281
282 case DISCONNECT:
283 case AUTH:
284 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
285
286 case PINGREQ:
287 case PINGRESP:
288
289 return null;
290 default:
291
292 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
293 }
294 }
295
296 private MqttConnectVariableHeader decodeConnectionVariableHeader(
297 ChannelHandlerContext ctx,
298 ByteBuf buffer) {
299 final Result<String> protoString = decodeString(buffer);
300 int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
301
302 final byte protocolLevel = buffer.readByte();
303 numberOfBytesConsumed += 1;
304
305 MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
306 MqttCodecUtil.setMqttVersion(ctx, version);
307
308 final int b1 = buffer.readUnsignedByte();
309 numberOfBytesConsumed += 1;
310
311 final int keepAlive = decodeMsbLsb(buffer);
312 numberOfBytesConsumed += 2;
313
314 final boolean hasUserName = (b1 & 0x80) == 0x80;
315 final boolean hasPassword = (b1 & 0x40) == 0x40;
316 final boolean willRetain = (b1 & 0x20) == 0x20;
317 final int willQos = (b1 & 0x18) >> 3;
318 final boolean willFlag = (b1 & 0x04) == 0x04;
319 final boolean cleanSession = (b1 & 0x02) == 0x02;
320 if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
321 final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
322 if (!zeroReservedFlag) {
323
324
325
326 throw new DecoderException("non-zero reserved flag");
327 }
328 }
329
330 final MqttProperties properties;
331 if (version == MqttVersion.MQTT_5) {
332 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
333 properties = propertiesResult.value;
334 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
335 } else {
336 properties = MqttProperties.NO_PROPERTIES;
337 }
338
339 bytesRemainingInVariablePart -= numberOfBytesConsumed;
340 return new MqttConnectVariableHeader(
341 version.protocolName(),
342 version.protocolLevel(),
343 hasUserName,
344 hasPassword,
345 willRetain,
346 willQos,
347 willFlag,
348 cleanSession,
349 keepAlive,
350 properties);
351 }
352
353 private MqttConnAckVariableHeader decodeConnAckVariableHeader(
354 ChannelHandlerContext ctx,
355 ByteBuf buffer) {
356 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
357 final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
358 byte returnCode = buffer.readByte();
359
360 final MqttProperties properties;
361 if (mqttVersion == MqttVersion.MQTT_5) {
362 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
363 properties = propertiesResult.value;
364 bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
365 } else {
366 properties = MqttProperties.NO_PROPERTIES;
367 bytesRemainingInVariablePart -= 2;
368 }
369
370 return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
371 }
372
373 private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
374 ChannelHandlerContext ctx,
375 ByteBuf buffer) {
376 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
377 final int packetId = decodeMessageId(buffer);
378
379 if (mqttVersion == MqttVersion.MQTT_5) {
380 final Result<MqttProperties> properties = decodeProperties(buffer);
381 bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
382 return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
383 } else {
384 bytesRemainingInVariablePart -= 2;
385 return new MqttMessageIdAndPropertiesVariableHeader(packetId,
386 MqttProperties.NO_PROPERTIES);
387 }
388 }
389
390 private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
391 final int packetId = decodeMessageId(buffer);
392
393 final int packetIdNumberOfBytesConsumed = 2;
394 if (bytesRemainingInVariablePart > 3) {
395 final byte reasonCode = buffer.readByte();
396 final Result<MqttProperties> properties = decodeProperties(buffer);
397 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
398 return new MqttPubReplyMessageVariableHeader(packetId,
399 reasonCode,
400 properties.value);
401 } else if (bytesRemainingInVariablePart > 2) {
402 final byte reasonCode = buffer.readByte();
403 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
404 return new MqttPubReplyMessageVariableHeader(packetId,
405 reasonCode,
406 MqttProperties.NO_PROPERTIES);
407 } else {
408 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
409 return new MqttPubReplyMessageVariableHeader(packetId,
410 (byte) 0,
411 MqttProperties.NO_PROPERTIES);
412 }
413 }
414
415 private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
416 ByteBuf buffer) {
417 final byte reasonCode;
418 final MqttProperties properties;
419 if (bytesRemainingInVariablePart > 1) {
420 reasonCode = buffer.readByte();
421 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
422 properties = propertiesResult.value;
423 bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
424 } else if (bytesRemainingInVariablePart > 0) {
425 reasonCode = buffer.readByte();
426 properties = MqttProperties.NO_PROPERTIES;
427 --bytesRemainingInVariablePart;
428 } else {
429 reasonCode = 0;
430 properties = MqttProperties.NO_PROPERTIES;
431 }
432
433 return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
434 }
435
436 private MqttPublishVariableHeader decodePublishVariableHeader(
437 ChannelHandlerContext ctx,
438 ByteBuf buffer,
439 MqttFixedHeader mqttFixedHeader) {
440 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
441 final Result<String> decodedTopic = decodeString(buffer);
442 if (!isValidPublishTopicName(decodedTopic.value)) {
443 throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
444 }
445 int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
446
447 int messageId = -1;
448 if (mqttFixedHeader.qosLevel().value() > 0) {
449 messageId = decodeMessageId(buffer);
450 numberOfBytesConsumed += 2;
451 }
452
453 final MqttProperties properties;
454 if (mqttVersion == MqttVersion.MQTT_5) {
455 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
456 properties = propertiesResult.value;
457 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
458 } else {
459 properties = MqttProperties.NO_PROPERTIES;
460 }
461
462 bytesRemainingInVariablePart -= numberOfBytesConsumed;
463 return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
464 }
465
466
467
468
469 private static int decodeMessageId(ByteBuf buffer) {
470 final int messageId = decodeMsbLsb(buffer);
471 if (!isValidMessageId(messageId)) {
472 throw new DecoderException("invalid messageId: " + messageId);
473 }
474 return messageId;
475 }
476
477
478
479
480
481
482
483
484
485 private Object decodePayload(
486 ChannelHandlerContext ctx,
487 ByteBuf buffer,
488 MqttMessageType messageType,
489 int maxClientIdLength,
490 Object variableHeader) {
491 switch (messageType) {
492 case CONNECT:
493 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
494
495 case SUBSCRIBE:
496 return decodeSubscribePayload(buffer);
497
498 case SUBACK:
499 return decodeSubackPayload(buffer);
500
501 case UNSUBSCRIBE:
502 return decodeUnsubscribePayload(buffer);
503
504 case UNSUBACK:
505 return decodeUnsubAckPayload(ctx, buffer);
506
507 case PUBLISH:
508 return decodePublishPayload(buffer);
509
510 default:
511
512
513
514
515 validateNoBytesRemain(0);
516 return null;
517 }
518 }
519
520 private MqttConnectPayload decodeConnectionPayload(
521 ByteBuf buffer,
522 int maxClientIdLength,
523 MqttConnectVariableHeader mqttConnectVariableHeader) {
524 final Result<String> decodedClientId = decodeString(buffer);
525 final String decodedClientIdValue = decodedClientId.value;
526 final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
527 (byte) mqttConnectVariableHeader.version());
528 if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
529 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
530 }
531 int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
532
533 Result<String> decodedWillTopic = null;
534 byte[] decodedWillMessage = null;
535
536 final MqttProperties willProperties;
537 if (mqttConnectVariableHeader.isWillFlag()) {
538 if (mqttVersion == MqttVersion.MQTT_5) {
539 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
540 willProperties = propertiesResult.value;
541 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
542 } else {
543 willProperties = MqttProperties.NO_PROPERTIES;
544 }
545 decodedWillTopic = decodeString(buffer, 0, 32767);
546 numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
547 decodedWillMessage = decodeByteArray(buffer);
548 numberOfBytesConsumed += decodedWillMessage.length + 2;
549 } else {
550 willProperties = MqttProperties.NO_PROPERTIES;
551 }
552 Result<String> decodedUserName = null;
553 byte[] decodedPassword = null;
554 if (mqttConnectVariableHeader.hasUserName()) {
555 decodedUserName = decodeString(buffer);
556 numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
557 }
558 if (mqttConnectVariableHeader.hasPassword()) {
559 decodedPassword = decodeByteArray(buffer);
560 numberOfBytesConsumed += decodedPassword.length + 2;
561 }
562
563 validateNoBytesRemain(numberOfBytesConsumed);
564 return new MqttConnectPayload(
565 decodedClientId.value,
566 willProperties,
567 decodedWillTopic != null ? decodedWillTopic.value : null,
568 decodedWillMessage,
569 decodedUserName != null ? decodedUserName.value : null,
570 decodedPassword);
571 }
572
573 private MqttSubscribePayload decodeSubscribePayload(
574 ByteBuf buffer) {
575 final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
576 int numberOfBytesConsumed = 0;
577 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
578 final Result<String> decodedTopicName = decodeString(buffer);
579 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
580
581 final short optionByte = buffer.readUnsignedByte();
582
583 MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
584 boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
585 boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
586 RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
587
588 final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
589 noLocal,
590 retainAsPublished,
591 retainHandling);
592
593 numberOfBytesConsumed++;
594 subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
595 }
596 validateNoBytesRemain(numberOfBytesConsumed);
597 return new MqttSubscribePayload(subscribeTopics);
598 }
599
600 private MqttSubAckPayload decodeSubackPayload(
601 ByteBuf buffer) {
602 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
603 final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
604 int numberOfBytesConsumed = 0;
605 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
606 int reasonCode = buffer.readUnsignedByte();
607 numberOfBytesConsumed++;
608 grantedQos.add(reasonCode);
609 }
610 validateNoBytesRemain(numberOfBytesConsumed);
611 return new MqttSubAckPayload(grantedQos);
612 }
613
614 private MqttUnsubAckPayload decodeUnsubAckPayload(
615 ChannelHandlerContext ctx,
616 ByteBuf buffer) {
617 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
618 final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
619 int numberOfBytesConsumed = 0;
620 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
621 short reasonCode = buffer.readUnsignedByte();
622 numberOfBytesConsumed++;
623 reasonCodes.add(reasonCode);
624 }
625 validateNoBytesRemain(numberOfBytesConsumed);
626 return new MqttUnsubAckPayload(reasonCodes);
627 }
628
629 private MqttUnsubscribePayload decodeUnsubscribePayload(
630 ByteBuf buffer) {
631 final List<String> unsubscribeTopics = new ArrayList<String>();
632 int numberOfBytesConsumed = 0;
633 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
634 final Result<String> decodedTopicName = decodeString(buffer);
635 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
636 unsubscribeTopics.add(decodedTopicName.value);
637 }
638 validateNoBytesRemain(numberOfBytesConsumed);
639 return new MqttUnsubscribePayload(unsubscribeTopics);
640 }
641
642 private ByteBuf decodePublishPayload(ByteBuf buffer) {
643 return buffer.readRetainedSlice(bytesRemainingInVariablePart);
644 }
645
646 private void validateNoBytesRemain(int numberOfBytesConsumed) {
647 bytesRemainingInVariablePart -= numberOfBytesConsumed;
648 if (bytesRemainingInVariablePart != 0) {
649 throw new DecoderException(
650 "non-zero remaining payload bytes: " +
651 bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
652 }
653 }
654
655 private static Result<String> decodeString(ByteBuf buffer) {
656 return decodeString(buffer, 0, Integer.MAX_VALUE);
657 }
658
659 private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
660 int size = decodeMsbLsb(buffer);
661 int numberOfBytesConsumed = 2;
662 if (size < minBytes || size > maxBytes) {
663 buffer.skipBytes(size);
664 numberOfBytesConsumed += size;
665 return new Result<String>(null, numberOfBytesConsumed);
666 }
667 String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
668 buffer.skipBytes(size);
669 numberOfBytesConsumed += size;
670 return new Result<String>(s, numberOfBytesConsumed);
671 }
672
673
674
675
676
677 private static byte[] decodeByteArray(ByteBuf buffer) {
678 int size = decodeMsbLsb(buffer);
679 byte[] bytes = new byte[size];
680 buffer.readBytes(bytes);
681 return bytes;
682 }
683
684
685 private static long packInts(int a, int b) {
686 return (((long) a) << 32) | (b & 0xFFFFFFFFL);
687 }
688
689 private static int unpackA(long ints) {
690 return (int) (ints >> 32);
691 }
692
693 private static int unpackB(long ints) {
694 return (int) ints;
695 }
696
697
698
699
700 private static int decodeMsbLsb(ByteBuf buffer) {
701 int min = 0;
702 int max = 65535;
703 short msbSize = buffer.readUnsignedByte();
704 short lsbSize = buffer.readUnsignedByte();
705 int result = msbSize << 8 | lsbSize;
706 if (result < min || result > max) {
707 result = -1;
708 }
709 return result;
710 }
711
712
713
714
715
716
717
718
719 private static long decodeVariableByteInteger(ByteBuf buffer) {
720 int remainingLength = 0;
721 int multiplier = 1;
722 short digit;
723 int loops = 0;
724 do {
725 digit = buffer.readUnsignedByte();
726 remainingLength += (digit & 127) * multiplier;
727 multiplier *= 128;
728 loops++;
729 } while ((digit & 128) != 0 && loops < 4);
730
731 if (loops == 4 && (digit & 128) != 0) {
732 throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
733 }
734 return packInts(remainingLength, loops);
735 }
736
737 private static final class Result<T> {
738
739 private final T value;
740 private final int numberOfBytesConsumed;
741
742 Result(T value, int numberOfBytesConsumed) {
743 this.value = value;
744 this.numberOfBytesConsumed = numberOfBytesConsumed;
745 }
746 }
747
748 private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
749 final long propertiesLength = decodeVariableByteInteger(buffer);
750 int totalPropertiesLength = unpackA(propertiesLength);
751 int numberOfBytesConsumed = unpackB(propertiesLength);
752 if (buffer.readableBytes() < totalPropertiesLength) {
753
754 buffer.readSlice(totalPropertiesLength);
755 }
756
757 MqttProperties decodedProperties = new MqttProperties();
758 while (numberOfBytesConsumed < totalPropertiesLength) {
759 long propertyId = decodeVariableByteInteger(buffer);
760 final int propertyIdValue = unpackA(propertyId);
761 numberOfBytesConsumed += unpackB(propertyId);
762 MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
763 switch (propertyType) {
764 case PAYLOAD_FORMAT_INDICATOR:
765 case REQUEST_PROBLEM_INFORMATION:
766 case REQUEST_RESPONSE_INFORMATION:
767 case MAXIMUM_QOS:
768 case RETAIN_AVAILABLE:
769 case WILDCARD_SUBSCRIPTION_AVAILABLE:
770 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
771 case SHARED_SUBSCRIPTION_AVAILABLE:
772 final int b1 = buffer.readUnsignedByte();
773 numberOfBytesConsumed++;
774 decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
775 break;
776 case SERVER_KEEP_ALIVE:
777 case RECEIVE_MAXIMUM:
778 case TOPIC_ALIAS_MAXIMUM:
779 case TOPIC_ALIAS:
780 final int int2BytesResult = decodeMsbLsb(buffer);
781 numberOfBytesConsumed += 2;
782 decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
783 break;
784 case PUBLICATION_EXPIRY_INTERVAL:
785 case SESSION_EXPIRY_INTERVAL:
786 case WILL_DELAY_INTERVAL:
787 case MAXIMUM_PACKET_SIZE:
788 final int maxPacketSize = buffer.readInt();
789 numberOfBytesConsumed += 4;
790 decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
791 break;
792 case SUBSCRIPTION_IDENTIFIER:
793 long vbIntegerResult = decodeVariableByteInteger(buffer);
794 numberOfBytesConsumed += unpackB(vbIntegerResult);
795 decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
796 break;
797 case CONTENT_TYPE:
798 case RESPONSE_TOPIC:
799 case ASSIGNED_CLIENT_IDENTIFIER:
800 case AUTHENTICATION_METHOD:
801 case RESPONSE_INFORMATION:
802 case SERVER_REFERENCE:
803 case REASON_STRING:
804 final Result<String> stringResult = decodeString(buffer);
805 numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
806 decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
807 break;
808 case USER_PROPERTY:
809 final Result<String> keyResult = decodeString(buffer);
810 final Result<String> valueResult = decodeString(buffer);
811 numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
812 numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
813 decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
814 break;
815 case CORRELATION_DATA:
816 case AUTHENTICATION_DATA:
817 final byte[] binaryDataResult = decodeByteArray(buffer);
818 numberOfBytesConsumed += binaryDataResult.length + 2;
819 decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
820 break;
821 default:
822
823 throw new DecoderException("Unknown property type: " + propertyType);
824 }
825 }
826
827 return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
828 }
829 }