1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.ReplayingDecoder;
23 import io.netty.handler.codec.TooLongFrameException;
24 import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25 import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26 import io.netty.util.CharsetUtil;
27 import io.netty.util.internal.ObjectUtil;
28
29 import java.util.ArrayList;
30 import java.util.List;
31
32 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
34 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
35 import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
36 import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
37 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
38 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
39 import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
40
41
42
43
44
45
46
47
48
49 public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
50
51
52
53
54
55
56 enum DecoderState {
57 READ_FIXED_HEADER,
58 READ_VARIABLE_HEADER,
59 READ_PAYLOAD,
60 BAD_MESSAGE,
61 }
62
63 private MqttFixedHeader mqttFixedHeader;
64 private Object variableHeader;
65 private int bytesRemainingInVariablePart;
66
67 private final int maxBytesInMessage;
68 private final int maxClientIdLength;
69
70 public MqttDecoder() {
71 this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
72 }
73
74 public MqttDecoder(int maxBytesInMessage) {
75 this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
76 }
77
78 public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
79 super(DecoderState.READ_FIXED_HEADER);
80 this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
81 this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
82 }
83
84 @Override
85 protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
86 switch (state()) {
87 case READ_FIXED_HEADER: try {
88 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
89 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
90 checkpoint(DecoderState.READ_VARIABLE_HEADER);
91
92 } catch (Exception cause) {
93 out.add(invalidMessage(cause));
94 return;
95 }
96
97 case READ_VARIABLE_HEADER: try {
98 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
99 variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
100 if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
101 buffer.skipBytes(actualReadableBytes());
102 throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
103 + bytesRemainingBeforeVariableHeader);
104 }
105 checkpoint(DecoderState.READ_PAYLOAD);
106
107 } catch (Exception cause) {
108 out.add(invalidMessage(cause));
109 return;
110 }
111
112 case READ_PAYLOAD: try {
113 final Object decodedPayload =
114 decodePayload(
115 ctx,
116 buffer,
117 mqttFixedHeader.messageType(),
118 maxClientIdLength,
119 variableHeader);
120 checkpoint(DecoderState.READ_FIXED_HEADER);
121 MqttMessage message = MqttMessageFactory.newMessage(
122 mqttFixedHeader, variableHeader, decodedPayload);
123 mqttFixedHeader = null;
124 variableHeader = null;
125 out.add(message);
126 break;
127 } catch (Exception cause) {
128 out.add(invalidMessage(cause));
129 return;
130 }
131
132 case BAD_MESSAGE:
133
134 buffer.skipBytes(actualReadableBytes());
135 break;
136
137 default:
138
139 throw new Error();
140 }
141 }
142
143 private MqttMessage invalidMessage(Throwable cause) {
144 checkpoint(DecoderState.BAD_MESSAGE);
145 return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
146 }
147
148
149
150
151
152
153
154
155
156
157
158 private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
159 short b1 = buffer.readUnsignedByte();
160
161 MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
162 boolean dupFlag = (b1 & 0x08) == 0x08;
163 int qosLevel = (b1 & 0x06) >> 1;
164 boolean retain = (b1 & 0x01) != 0;
165
166 switch (messageType) {
167 case PUBLISH:
168 if (qosLevel == 3) {
169 throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
170 + qosLevel + ')');
171 }
172 break;
173
174 case PUBREL:
175 case SUBSCRIBE:
176 case UNSUBSCRIBE:
177 if (dupFlag) {
178 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
179 + " message, must be 0, found 1");
180 }
181 if (qosLevel != 1) {
182 throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
183 + " message, must be 1, found " + qosLevel);
184 }
185 if (retain) {
186 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
187 + " message, must be 0, found 1");
188 }
189 break;
190
191 case AUTH:
192 case CONNACK:
193 case CONNECT:
194 case DISCONNECT:
195 case PINGREQ:
196 case PINGRESP:
197 case PUBACK:
198 case PUBCOMP:
199 case PUBREC:
200 case SUBACK:
201 case UNSUBACK:
202 if (dupFlag) {
203 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
204 + " message, must be 0, found 1");
205 }
206 if (qosLevel != 0) {
207 throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
208 + " message, must be 0, found " + qosLevel);
209 }
210 if (retain) {
211 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
212 + " message, must be 0, found 1");
213 }
214 break;
215 default:
216 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
217 }
218
219 int remainingLength = parseRemainingLength(buffer, messageType);
220 MqttFixedHeader decodedFixedHeader =
221 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
222 return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
223 }
224
225 private static int parseRemainingLength(ByteBuf buffer, MqttMessageType messageType) {
226 int remainingLength = 0;
227 int multiplier = 1;
228
229 for (int i = 0; i < 4; i++) {
230 short digit = buffer.readUnsignedByte();
231 remainingLength += (digit & 127) * multiplier;
232
233 if ((digit & 128) == 0) {
234 return remainingLength;
235 }
236
237 multiplier *= 128;
238 }
239
240
241 throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
242 }
243
244
245
246
247
248
249
250 private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
251 switch (mqttFixedHeader.messageType()) {
252 case CONNECT:
253 return decodeConnectionVariableHeader(ctx, buffer);
254
255 case CONNACK:
256 return decodeConnAckVariableHeader(ctx, buffer);
257
258 case UNSUBSCRIBE:
259 case SUBSCRIBE:
260 case SUBACK:
261 case UNSUBACK:
262 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
263
264 case PUBACK:
265 case PUBREC:
266 case PUBCOMP:
267 case PUBREL:
268 return decodePubReplyMessage(buffer);
269
270 case PUBLISH:
271 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
272
273 case DISCONNECT:
274 case AUTH:
275 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
276
277 case PINGREQ:
278 case PINGRESP:
279
280 return null;
281 default:
282
283 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
284 }
285 }
286
287 private MqttConnectVariableHeader decodeConnectionVariableHeader(
288 ChannelHandlerContext ctx,
289 ByteBuf buffer) {
290 final String protoString = decodeStringAndDecreaseBytesRemaining(buffer);
291
292 final byte protocolLevel = buffer.readByte();
293 MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString, protocolLevel);
294 MqttCodecUtil.setMqttVersion(ctx, version);
295
296 final int b1 = buffer.readUnsignedByte();
297 final int keepAlive = decodeMsbLsb(buffer);
298 final boolean hasUserName = (b1 & 0x80) == 0x80;
299 final boolean hasPassword = (b1 & 0x40) == 0x40;
300 final boolean willRetain = (b1 & 0x20) == 0x20;
301 final int willQos = (b1 & 0x18) >> 3;
302 final boolean willFlag = (b1 & 0x04) == 0x04;
303 final boolean cleanSession = (b1 & 0x02) == 0x02;
304 if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
305 final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
306 if (!zeroReservedFlag) {
307
308
309
310 throw new DecoderException("non-zero reserved flag");
311 }
312 }
313
314 final MqttProperties properties = version == MqttVersion.MQTT_5
315 ? decodeProperties(buffer)
316 : MqttProperties.NO_PROPERTIES;
317
318 bytesRemainingInVariablePart -= 4;
319 return new MqttConnectVariableHeader(
320 version.protocolName(),
321 version.protocolLevel(),
322 hasUserName,
323 hasPassword,
324 willRetain,
325 willQos,
326 willFlag,
327 cleanSession,
328 keepAlive,
329 properties);
330 }
331
332 private MqttConnAckVariableHeader decodeConnAckVariableHeader(
333 ChannelHandlerContext ctx,
334 ByteBuf buffer) {
335 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
336 final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
337 byte returnCode = buffer.readByte();
338
339 bytesRemainingInVariablePart -= 2;
340 final MqttProperties properties = mqttVersion == MqttVersion.MQTT_5
341 ? decodeProperties(buffer)
342 : MqttProperties.NO_PROPERTIES;
343
344 return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
345 }
346
347 private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
348 ChannelHandlerContext ctx,
349 ByteBuf buffer) {
350 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
351 final int packetId = decodeMessageId(buffer);
352
353 bytesRemainingInVariablePart -= 2;
354 MqttProperties properties = mqttVersion == MqttVersion.MQTT_5
355 ? decodeProperties(buffer)
356 : MqttProperties.NO_PROPERTIES;
357 return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties);
358 }
359
360 private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
361 final int packetId = decodeMessageId(buffer);
362
363 final int packetIdNumberOfBytesConsumed = 2;
364 if (bytesRemainingInVariablePart > 3) {
365 final byte reasonCode = buffer.readByte();
366 final MqttProperties properties = decodeProperties(buffer);
367 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
368 return new MqttPubReplyMessageVariableHeader(packetId,
369 reasonCode,
370 properties);
371 } else if (bytesRemainingInVariablePart > 2) {
372 final byte reasonCode = buffer.readByte();
373 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
374 return new MqttPubReplyMessageVariableHeader(packetId,
375 reasonCode,
376 MqttProperties.NO_PROPERTIES);
377 } else {
378 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
379 return new MqttPubReplyMessageVariableHeader(packetId,
380 (byte) 0,
381 MqttProperties.NO_PROPERTIES);
382 }
383 }
384
385 private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
386 ByteBuf buffer) {
387 final byte reasonCode;
388 final MqttProperties properties;
389 if (bytesRemainingInVariablePart > 1) {
390 reasonCode = buffer.readByte();
391 properties = decodeProperties(buffer);
392 --bytesRemainingInVariablePart;
393 } else if (bytesRemainingInVariablePart > 0) {
394 reasonCode = buffer.readByte();
395 properties = MqttProperties.NO_PROPERTIES;
396 --bytesRemainingInVariablePart;
397 } else {
398 reasonCode = 0;
399 properties = MqttProperties.NO_PROPERTIES;
400 }
401
402 return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
403 }
404
405 private MqttPublishVariableHeader decodePublishVariableHeader(
406 ChannelHandlerContext ctx,
407 ByteBuf buffer,
408 MqttFixedHeader mqttFixedHeader) {
409 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
410 final String decodedTopic = decodeStringAndDecreaseBytesRemaining(buffer);
411 if (!isValidPublishTopicName(decodedTopic)) {
412 throw new DecoderException("invalid publish topic name: " + decodedTopic + " (contains wildcards)");
413 }
414
415 int messageId = -1;
416 if (mqttFixedHeader.qosLevel().value() > 0) {
417 messageId = decodeMessageId(buffer);
418 bytesRemainingInVariablePart -= 2;
419 }
420
421 final MqttProperties properties = mqttVersion == MqttVersion.MQTT_5
422 ? decodeProperties(buffer)
423 : MqttProperties.NO_PROPERTIES;
424
425 return new MqttPublishVariableHeader(decodedTopic, messageId, properties);
426 }
427
428
429
430
431 private static int decodeMessageId(ByteBuf buffer) {
432 final int messageId = decodeMsbLsb(buffer);
433 if (!isValidMessageId(messageId)) {
434 throw new DecoderException("invalid messageId: " + messageId);
435 }
436 return messageId;
437 }
438
439
440
441
442
443
444
445
446
447 private Object decodePayload(
448 ChannelHandlerContext ctx,
449 ByteBuf buffer,
450 MqttMessageType messageType,
451 int maxClientIdLength,
452 Object variableHeader) {
453 switch (messageType) {
454 case CONNECT:
455 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
456
457 case SUBSCRIBE:
458 return decodeSubscribePayload(buffer);
459
460 case SUBACK:
461 return decodeSubackPayload(buffer);
462
463 case UNSUBSCRIBE:
464 return decodeUnsubscribePayload(buffer);
465
466 case UNSUBACK:
467 return decodeUnsubAckPayload(ctx, buffer);
468
469 case PUBLISH:
470 return decodePublishPayload(buffer);
471
472 default:
473
474 return null;
475 }
476 }
477
478 private MqttConnectPayload decodeConnectionPayload(
479 ByteBuf buffer,
480 int maxClientIdLength,
481 MqttConnectVariableHeader mqttConnectVariableHeader) {
482 String decodedClientId = decodeStringAndDecreaseBytesRemaining(buffer);
483 final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
484 (byte) mqttConnectVariableHeader.version());
485 if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientId)) {
486 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientId);
487 }
488
489 String decodedWillTopic = null;
490 byte[] decodedWillMessage = null;
491
492 int numberOfBytesConsumed = 0;
493 final MqttProperties willProperties;
494 if (mqttConnectVariableHeader.isWillFlag()) {
495 if (mqttVersion == MqttVersion.MQTT_5) {
496 willProperties = decodeProperties(buffer);
497 } else {
498 willProperties = MqttProperties.NO_PROPERTIES;
499 }
500
501 int willTopicSize = decodeMsbLsb(buffer);
502 numberOfBytesConsumed += 2 + willTopicSize;
503 if (willTopicSize <= 32767) {
504 decodedWillTopic = buffer.toString(buffer.readerIndex(), willTopicSize, CharsetUtil.UTF_8);
505 }
506 buffer.skipBytes(willTopicSize);
507
508 decodedWillMessage = decodeByteArray(buffer);
509 numberOfBytesConsumed += decodedWillMessage.length + 2;
510 } else {
511 willProperties = MqttProperties.NO_PROPERTIES;
512 }
513 String decodedUserName = null;
514 byte[] decodedPassword = null;
515 if (mqttConnectVariableHeader.hasUserName()) {
516 decodedUserName = decodeStringAndDecreaseBytesRemaining(buffer);
517 }
518 if (mqttConnectVariableHeader.hasPassword()) {
519 decodedPassword = decodeByteArray(buffer);
520 numberOfBytesConsumed += decodedPassword.length + 2;
521 }
522
523 validateNoBytesRemain(numberOfBytesConsumed);
524 return new MqttConnectPayload(
525 decodedClientId,
526 willProperties,
527 decodedWillTopic,
528 decodedWillMessage,
529 decodedUserName,
530 decodedPassword);
531 }
532
533 private MqttSubscribePayload decodeSubscribePayload(
534 ByteBuf buffer) {
535 final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
536 int numberOfBytesConsumed = 0;
537 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
538 int topicNameSize = decodeMsbLsb(buffer);
539 String decodedTopicName = buffer.toString(buffer.readerIndex(), topicNameSize, CharsetUtil.UTF_8);
540 buffer.skipBytes(topicNameSize);
541 numberOfBytesConsumed += 2 + topicNameSize;
542
543
544 final short optionByte = buffer.readUnsignedByte();
545
546 MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
547 boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
548 boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
549 RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
550
551 final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
552 noLocal,
553 retainAsPublished,
554 retainHandling);
555
556 numberOfBytesConsumed++;
557 subscribeTopics.add(new MqttTopicSubscription(decodedTopicName, subscriptionOption));
558 }
559 validateNoBytesRemain(numberOfBytesConsumed);
560 return new MqttSubscribePayload(subscribeTopics);
561 }
562
563 private MqttSubAckPayload decodeSubackPayload(
564 ByteBuf buffer) {
565 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
566 final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
567 int numberOfBytesConsumed = 0;
568 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
569 int reasonCode = buffer.readUnsignedByte();
570 numberOfBytesConsumed++;
571 grantedQos.add(reasonCode);
572 }
573 validateNoBytesRemain(numberOfBytesConsumed);
574 return new MqttSubAckPayload(grantedQos);
575 }
576
577 private MqttUnsubAckPayload decodeUnsubAckPayload(
578 ChannelHandlerContext ctx,
579 ByteBuf buffer) {
580 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
581 final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
582 int numberOfBytesConsumed = 0;
583 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
584 short reasonCode = buffer.readUnsignedByte();
585 numberOfBytesConsumed++;
586 reasonCodes.add(reasonCode);
587 }
588 validateNoBytesRemain(numberOfBytesConsumed);
589 return new MqttUnsubAckPayload(reasonCodes);
590 }
591
592 private MqttUnsubscribePayload decodeUnsubscribePayload(
593 ByteBuf buffer) {
594 final List<String> unsubscribeTopics = new ArrayList<String>();
595 while (bytesRemainingInVariablePart > 0) {
596 final String decodedTopicName = decodeStringAndDecreaseBytesRemaining(buffer);
597 unsubscribeTopics.add(decodedTopicName);
598 }
599 return new MqttUnsubscribePayload(unsubscribeTopics);
600 }
601
602 private ByteBuf decodePublishPayload(ByteBuf buffer) {
603 return buffer.readRetainedSlice(bytesRemainingInVariablePart);
604 }
605
606 private void validateNoBytesRemain(int numberOfBytesConsumed) {
607 bytesRemainingInVariablePart -= numberOfBytesConsumed;
608 if (bytesRemainingInVariablePart != 0) {
609 throw new DecoderException(
610 "non-zero remaining payload bytes: " +
611 bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
612 }
613 }
614
615 private String decodeStringAndDecreaseBytesRemaining(ByteBuf buffer) {
616 int size = decodeMsbLsb(buffer);
617 bytesRemainingInVariablePart -= 2 + size;
618 String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
619 buffer.skipBytes(size);
620 return s;
621 }
622
623
624
625
626
627 private static byte[] decodeByteArray(ByteBuf buffer) {
628 int size = decodeMsbLsb(buffer);
629 byte[] bytes = new byte[size];
630 buffer.readBytes(bytes);
631 return bytes;
632 }
633
634
635 private static long packInts(int a, int b) {
636 return (((long) a) << 32) | (b & 0xFFFFFFFFL);
637 }
638
639 private static int unpackA(long ints) {
640 return (int) (ints >> 32);
641 }
642
643 private static int unpackB(long ints) {
644 return (int) ints;
645 }
646
647
648
649
650 private static int decodeMsbLsb(ByteBuf buffer) {
651 int min = 0;
652 int max = 65535;
653 short msbSize = buffer.readUnsignedByte();
654 short lsbSize = buffer.readUnsignedByte();
655 int result = msbSize << 8 | lsbSize;
656 if (result < min || result > max) {
657 result = -1;
658 }
659 return result;
660 }
661
662
663
664
665
666
667
668
669 private static long decodeVariableByteInteger(ByteBuf buffer) {
670 int remainingLength = 0;
671 int multiplier = 1;
672
673 for (int i = 0; i < 4; i++) {
674 short digit = buffer.readUnsignedByte();
675 remainingLength += (digit & 127) * multiplier;
676
677 if ((digit & 128) == 0) {
678 return packInts(remainingLength, i + 1);
679 }
680
681 multiplier *= 128;
682 }
683
684 throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
685 }
686
687 private MqttProperties decodeProperties(ByteBuf buffer) {
688 final long propertiesLength = decodeVariableByteInteger(buffer);
689 int totalPropertiesLength = unpackA(propertiesLength);
690 int numberOfBytesConsumed = unpackB(propertiesLength);
691
692 MqttProperties decodedProperties = new MqttProperties();
693 while (numberOfBytesConsumed < totalPropertiesLength) {
694 long propertyId = decodeVariableByteInteger(buffer);
695 final int propertyIdValue = unpackA(propertyId);
696 numberOfBytesConsumed += unpackB(propertyId);
697 MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
698 switch (propertyType) {
699 case PAYLOAD_FORMAT_INDICATOR:
700 case REQUEST_PROBLEM_INFORMATION:
701 case REQUEST_RESPONSE_INFORMATION:
702 case MAXIMUM_QOS:
703 case RETAIN_AVAILABLE:
704 case WILDCARD_SUBSCRIPTION_AVAILABLE:
705 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
706 case SHARED_SUBSCRIPTION_AVAILABLE:
707 final int b1 = buffer.readUnsignedByte();
708 numberOfBytesConsumed++;
709 decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
710 break;
711 case SERVER_KEEP_ALIVE:
712 case RECEIVE_MAXIMUM:
713 case TOPIC_ALIAS_MAXIMUM:
714 case TOPIC_ALIAS:
715 final int int2BytesResult = decodeMsbLsb(buffer);
716 numberOfBytesConsumed += 2;
717 decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
718 break;
719 case PUBLICATION_EXPIRY_INTERVAL:
720 case SESSION_EXPIRY_INTERVAL:
721 case WILL_DELAY_INTERVAL:
722 case MAXIMUM_PACKET_SIZE:
723 final int maxPacketSize = buffer.readInt();
724 numberOfBytesConsumed += 4;
725 decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
726 break;
727 case SUBSCRIPTION_IDENTIFIER:
728 long vbIntegerResult = decodeVariableByteInteger(buffer);
729 numberOfBytesConsumed += unpackB(vbIntegerResult);
730 decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
731 break;
732 case CONTENT_TYPE:
733 case RESPONSE_TOPIC:
734 case ASSIGNED_CLIENT_IDENTIFIER:
735 case AUTHENTICATION_METHOD:
736 case RESPONSE_INFORMATION:
737 case SERVER_REFERENCE:
738 case REASON_STRING:
739 int size = decodeMsbLsb(buffer);
740 numberOfBytesConsumed += 2 + size;
741 String string = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
742 buffer.skipBytes(size);
743
744 decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, string));
745 break;
746 case USER_PROPERTY:
747 int keySize = decodeMsbLsb(buffer);
748 String key = buffer.toString(buffer.readerIndex(), keySize, CharsetUtil.UTF_8);
749 buffer.skipBytes(keySize);
750
751 int valueSize = decodeMsbLsb(buffer);
752 String value = buffer.toString(buffer.readerIndex(), valueSize, CharsetUtil.UTF_8);
753 buffer.skipBytes(valueSize);
754
755 numberOfBytesConsumed += 4 + keySize + valueSize;
756 decodedProperties.add(new MqttProperties.UserProperty(key, value));
757 break;
758 case CORRELATION_DATA:
759 case AUTHENTICATION_DATA:
760 final byte[] binaryDataResult = decodeByteArray(buffer);
761 numberOfBytesConsumed += binaryDataResult.length + 2;
762 decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
763 break;
764 default:
765
766 throw new DecoderException("Unknown property type: " + propertyType);
767 }
768 }
769
770 bytesRemainingInVariablePart -= numberOfBytesConsumed;
771 return decodedProperties;
772 }
773 }