1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.ReplayingDecoder;
23 import io.netty.handler.codec.TooLongFrameException;
24 import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25 import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26 import io.netty.util.CharsetUtil;
27 import io.netty.util.internal.ObjectUtil;
28
29 import java.util.ArrayList;
30 import java.util.List;
31
32 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
34 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
35 import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
36 import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
37 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
38 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
39 import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
40
41
42
43
44
45
46
47
48
49 public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
50
51
52
53
54
55
56 enum DecoderState {
57 READ_FIXED_HEADER,
58 READ_VARIABLE_HEADER,
59 READ_PAYLOAD,
60 BAD_MESSAGE,
61 }
62
63 private MqttFixedHeader mqttFixedHeader;
64 private Object variableHeader;
65 private int bytesRemainingInVariablePart;
66
67 private final int maxBytesInMessage;
68 private final int maxClientIdLength;
69
70 public MqttDecoder() {
71 this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
72 }
73
74 public MqttDecoder(int maxBytesInMessage) {
75 this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
76 }
77
78 public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
79 super(DecoderState.READ_FIXED_HEADER);
80 this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
81 this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
82 }
83
84 @Override
85 protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
86 switch (state()) {
87 case READ_FIXED_HEADER: try {
88 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
89 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
90 checkpoint(DecoderState.READ_VARIABLE_HEADER);
91
92 } catch (Exception cause) {
93 out.add(invalidMessage(cause));
94 return;
95 }
96
97 case READ_VARIABLE_HEADER: try {
98 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
99 variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
100 if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
101 buffer.skipBytes(actualReadableBytes());
102 throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
103 + bytesRemainingBeforeVariableHeader);
104 }
105 checkpoint(DecoderState.READ_PAYLOAD);
106
107 } catch (Exception cause) {
108 out.add(invalidMessage(cause));
109 return;
110 }
111
112 case READ_PAYLOAD: try {
113 final Object decodedPayload =
114 decodePayload(
115 ctx,
116 buffer,
117 mqttFixedHeader.messageType(),
118 maxClientIdLength,
119 variableHeader);
120 checkpoint(DecoderState.READ_FIXED_HEADER);
121 MqttMessage message = MqttMessageFactory.newMessage(
122 mqttFixedHeader, variableHeader, decodedPayload);
123 mqttFixedHeader = null;
124 variableHeader = null;
125 out.add(message);
126 break;
127 } catch (Exception cause) {
128 out.add(invalidMessage(cause));
129 return;
130 }
131
132 case BAD_MESSAGE:
133
134 buffer.skipBytes(actualReadableBytes());
135 break;
136
137 default:
138
139 throw new Error();
140 }
141 }
142
143 private MqttMessage invalidMessage(Throwable cause) {
144 checkpoint(DecoderState.BAD_MESSAGE);
145 return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
146 }
147
148
149
150
151
152
153
154
155
156
157
158 private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
159 short b1 = buffer.readUnsignedByte();
160
161 MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
162 boolean dupFlag = (b1 & 0x08) == 0x08;
163 int qosLevel = (b1 & 0x06) >> 1;
164 boolean retain = (b1 & 0x01) != 0;
165
166 switch (messageType) {
167 case PUBLISH:
168 if (qosLevel == 3) {
169 throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
170 + qosLevel + ')');
171 }
172 break;
173
174 case PUBREL:
175 case SUBSCRIBE:
176 case UNSUBSCRIBE:
177 if (dupFlag) {
178 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
179 + " message, must be 0, found 1");
180 }
181 if (qosLevel != 1) {
182 throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
183 + " message, must be 1, found " + qosLevel);
184 }
185 if (retain) {
186 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
187 + " message, must be 0, found 1");
188 }
189 break;
190
191 case AUTH:
192 case CONNACK:
193 case CONNECT:
194 case DISCONNECT:
195 case PINGREQ:
196 case PINGRESP:
197 case PUBACK:
198 case PUBCOMP:
199 case PUBREC:
200 case SUBACK:
201 case UNSUBACK:
202 if (dupFlag) {
203 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
204 + " message, must be 0, found 1");
205 }
206 if (qosLevel != 0) {
207 throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
208 + " message, must be 0, found " + qosLevel);
209 }
210 if (retain) {
211 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
212 + " message, must be 0, found 1");
213 }
214 break;
215 default:
216 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
217 }
218
219 int remainingLength = 0;
220 int multiplier = 1;
221 short digit;
222 int loops = 0;
223 do {
224 digit = buffer.readUnsignedByte();
225 remainingLength += (digit & 127) * multiplier;
226 multiplier *= 128;
227 loops++;
228 } while ((digit & 128) != 0 && loops < 4);
229
230
231 if (loops == 4 && (digit & 128) != 0) {
232 throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
233 }
234 MqttFixedHeader decodedFixedHeader =
235 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
236 return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
237 }
238
239
240
241
242
243
244
245 private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
246 switch (mqttFixedHeader.messageType()) {
247 case CONNECT:
248 return decodeConnectionVariableHeader(ctx, buffer);
249
250 case CONNACK:
251 return decodeConnAckVariableHeader(ctx, buffer);
252
253 case UNSUBSCRIBE:
254 case SUBSCRIBE:
255 case SUBACK:
256 case UNSUBACK:
257 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
258
259 case PUBACK:
260 case PUBREC:
261 case PUBCOMP:
262 case PUBREL:
263 return decodePubReplyMessage(buffer);
264
265 case PUBLISH:
266 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
267
268 case DISCONNECT:
269 case AUTH:
270 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
271
272 case PINGREQ:
273 case PINGRESP:
274
275 return null;
276 default:
277
278 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
279 }
280 }
281
282 private MqttConnectVariableHeader decodeConnectionVariableHeader(
283 ChannelHandlerContext ctx,
284 ByteBuf buffer) {
285 final Result<String> protoString = decodeString(buffer);
286 int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
287
288 final byte protocolLevel = buffer.readByte();
289 numberOfBytesConsumed += 1;
290
291 MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
292 MqttCodecUtil.setMqttVersion(ctx, version);
293
294 final int b1 = buffer.readUnsignedByte();
295 numberOfBytesConsumed += 1;
296
297 final int keepAlive = decodeMsbLsb(buffer);
298 numberOfBytesConsumed += 2;
299
300 final boolean hasUserName = (b1 & 0x80) == 0x80;
301 final boolean hasPassword = (b1 & 0x40) == 0x40;
302 final boolean willRetain = (b1 & 0x20) == 0x20;
303 final int willQos = (b1 & 0x18) >> 3;
304 final boolean willFlag = (b1 & 0x04) == 0x04;
305 final boolean cleanSession = (b1 & 0x02) == 0x02;
306 if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
307 final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
308 if (!zeroReservedFlag) {
309
310
311
312 throw new DecoderException("non-zero reserved flag");
313 }
314 }
315
316 final MqttProperties properties;
317 if (version == MqttVersion.MQTT_5) {
318 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
319 properties = propertiesResult.value;
320 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
321 } else {
322 properties = MqttProperties.NO_PROPERTIES;
323 }
324
325 bytesRemainingInVariablePart -= numberOfBytesConsumed;
326 return new MqttConnectVariableHeader(
327 version.protocolName(),
328 version.protocolLevel(),
329 hasUserName,
330 hasPassword,
331 willRetain,
332 willQos,
333 willFlag,
334 cleanSession,
335 keepAlive,
336 properties);
337 }
338
339 private MqttConnAckVariableHeader decodeConnAckVariableHeader(
340 ChannelHandlerContext ctx,
341 ByteBuf buffer) {
342 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
343 final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
344 byte returnCode = buffer.readByte();
345
346 final MqttProperties properties;
347 if (mqttVersion == MqttVersion.MQTT_5) {
348 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
349 properties = propertiesResult.value;
350 bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
351 } else {
352 properties = MqttProperties.NO_PROPERTIES;
353 bytesRemainingInVariablePart -= 2;
354 }
355
356 return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
357 }
358
359 private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
360 ChannelHandlerContext ctx,
361 ByteBuf buffer) {
362 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
363 final int packetId = decodeMessageId(buffer);
364
365 if (mqttVersion == MqttVersion.MQTT_5) {
366 final Result<MqttProperties> properties = decodeProperties(buffer);
367 bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
368 return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
369 } else {
370 bytesRemainingInVariablePart -= 2;
371 return new MqttMessageIdAndPropertiesVariableHeader(packetId,
372 MqttProperties.NO_PROPERTIES);
373 }
374 }
375
376 private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
377 final int packetId = decodeMessageId(buffer);
378
379 final int packetIdNumberOfBytesConsumed = 2;
380 if (bytesRemainingInVariablePart > 3) {
381 final byte reasonCode = buffer.readByte();
382 final Result<MqttProperties> properties = decodeProperties(buffer);
383 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
384 return new MqttPubReplyMessageVariableHeader(packetId,
385 reasonCode,
386 properties.value);
387 } else if (bytesRemainingInVariablePart > 2) {
388 final byte reasonCode = buffer.readByte();
389 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
390 return new MqttPubReplyMessageVariableHeader(packetId,
391 reasonCode,
392 MqttProperties.NO_PROPERTIES);
393 } else {
394 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
395 return new MqttPubReplyMessageVariableHeader(packetId,
396 (byte) 0,
397 MqttProperties.NO_PROPERTIES);
398 }
399 }
400
401 private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
402 ByteBuf buffer) {
403 final byte reasonCode;
404 final MqttProperties properties;
405 if (bytesRemainingInVariablePart > 1) {
406 reasonCode = buffer.readByte();
407 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
408 properties = propertiesResult.value;
409 bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
410 } else if (bytesRemainingInVariablePart > 0) {
411 reasonCode = buffer.readByte();
412 properties = MqttProperties.NO_PROPERTIES;
413 --bytesRemainingInVariablePart;
414 } else {
415 reasonCode = 0;
416 properties = MqttProperties.NO_PROPERTIES;
417 }
418
419 return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
420 }
421
422 private MqttPublishVariableHeader decodePublishVariableHeader(
423 ChannelHandlerContext ctx,
424 ByteBuf buffer,
425 MqttFixedHeader mqttFixedHeader) {
426 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
427 final Result<String> decodedTopic = decodeString(buffer);
428 if (!isValidPublishTopicName(decodedTopic.value)) {
429 throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
430 }
431 int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
432
433 int messageId = -1;
434 if (mqttFixedHeader.qosLevel().value() > 0) {
435 messageId = decodeMessageId(buffer);
436 numberOfBytesConsumed += 2;
437 }
438
439 final MqttProperties properties;
440 if (mqttVersion == MqttVersion.MQTT_5) {
441 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
442 properties = propertiesResult.value;
443 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
444 } else {
445 properties = MqttProperties.NO_PROPERTIES;
446 }
447
448 bytesRemainingInVariablePart -= numberOfBytesConsumed;
449 return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
450 }
451
452
453
454
455 private static int decodeMessageId(ByteBuf buffer) {
456 final int messageId = decodeMsbLsb(buffer);
457 if (!isValidMessageId(messageId)) {
458 throw new DecoderException("invalid messageId: " + messageId);
459 }
460 return messageId;
461 }
462
463
464
465
466
467
468
469
470
471 private Object decodePayload(
472 ChannelHandlerContext ctx,
473 ByteBuf buffer,
474 MqttMessageType messageType,
475 int maxClientIdLength,
476 Object variableHeader) {
477 switch (messageType) {
478 case CONNECT:
479 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
480
481 case SUBSCRIBE:
482 return decodeSubscribePayload(buffer);
483
484 case SUBACK:
485 return decodeSubackPayload(buffer);
486
487 case UNSUBSCRIBE:
488 return decodeUnsubscribePayload(buffer);
489
490 case UNSUBACK:
491 return decodeUnsubAckPayload(ctx, buffer);
492
493 case PUBLISH:
494 return decodePublishPayload(buffer);
495
496 default:
497
498 return null;
499 }
500 }
501
502 private MqttConnectPayload decodeConnectionPayload(
503 ByteBuf buffer,
504 int maxClientIdLength,
505 MqttConnectVariableHeader mqttConnectVariableHeader) {
506 final Result<String> decodedClientId = decodeString(buffer);
507 final String decodedClientIdValue = decodedClientId.value;
508 final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
509 (byte) mqttConnectVariableHeader.version());
510 if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
511 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
512 }
513 int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
514
515 Result<String> decodedWillTopic = null;
516 byte[] decodedWillMessage = null;
517
518 final MqttProperties willProperties;
519 if (mqttConnectVariableHeader.isWillFlag()) {
520 if (mqttVersion == MqttVersion.MQTT_5) {
521 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
522 willProperties = propertiesResult.value;
523 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
524 } else {
525 willProperties = MqttProperties.NO_PROPERTIES;
526 }
527 decodedWillTopic = decodeString(buffer, 0, 32767);
528 numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
529 decodedWillMessage = decodeByteArray(buffer);
530 numberOfBytesConsumed += decodedWillMessage.length + 2;
531 } else {
532 willProperties = MqttProperties.NO_PROPERTIES;
533 }
534 Result<String> decodedUserName = null;
535 byte[] decodedPassword = null;
536 if (mqttConnectVariableHeader.hasUserName()) {
537 decodedUserName = decodeString(buffer);
538 numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
539 }
540 if (mqttConnectVariableHeader.hasPassword()) {
541 decodedPassword = decodeByteArray(buffer);
542 numberOfBytesConsumed += decodedPassword.length + 2;
543 }
544
545 validateNoBytesRemain(numberOfBytesConsumed);
546 return new MqttConnectPayload(
547 decodedClientId.value,
548 willProperties,
549 decodedWillTopic != null ? decodedWillTopic.value : null,
550 decodedWillMessage,
551 decodedUserName != null ? decodedUserName.value : null,
552 decodedPassword);
553 }
554
555 private MqttSubscribePayload decodeSubscribePayload(
556 ByteBuf buffer) {
557 final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
558 int numberOfBytesConsumed = 0;
559 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
560 final Result<String> decodedTopicName = decodeString(buffer);
561 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
562
563 final short optionByte = buffer.readUnsignedByte();
564
565 MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
566 boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
567 boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
568 RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
569
570 final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
571 noLocal,
572 retainAsPublished,
573 retainHandling);
574
575 numberOfBytesConsumed++;
576 subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
577 }
578 validateNoBytesRemain(numberOfBytesConsumed);
579 return new MqttSubscribePayload(subscribeTopics);
580 }
581
582 private MqttSubAckPayload decodeSubackPayload(
583 ByteBuf buffer) {
584 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
585 final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
586 int numberOfBytesConsumed = 0;
587 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
588 int reasonCode = buffer.readUnsignedByte();
589 numberOfBytesConsumed++;
590 grantedQos.add(reasonCode);
591 }
592 validateNoBytesRemain(numberOfBytesConsumed);
593 return new MqttSubAckPayload(grantedQos);
594 }
595
596 private MqttUnsubAckPayload decodeUnsubAckPayload(
597 ChannelHandlerContext ctx,
598 ByteBuf buffer) {
599 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
600 final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
601 int numberOfBytesConsumed = 0;
602 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
603 short reasonCode = buffer.readUnsignedByte();
604 numberOfBytesConsumed++;
605 reasonCodes.add(reasonCode);
606 }
607 validateNoBytesRemain(numberOfBytesConsumed);
608 return new MqttUnsubAckPayload(reasonCodes);
609 }
610
611 private MqttUnsubscribePayload decodeUnsubscribePayload(
612 ByteBuf buffer) {
613 final List<String> unsubscribeTopics = new ArrayList<String>();
614 int numberOfBytesConsumed = 0;
615 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
616 final Result<String> decodedTopicName = decodeString(buffer);
617 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
618 unsubscribeTopics.add(decodedTopicName.value);
619 }
620 validateNoBytesRemain(numberOfBytesConsumed);
621 return new MqttUnsubscribePayload(unsubscribeTopics);
622 }
623
624 private ByteBuf decodePublishPayload(ByteBuf buffer) {
625 return buffer.readRetainedSlice(bytesRemainingInVariablePart);
626 }
627
628 private void validateNoBytesRemain(int numberOfBytesConsumed) {
629 bytesRemainingInVariablePart -= numberOfBytesConsumed;
630 if (bytesRemainingInVariablePart != 0) {
631 throw new DecoderException(
632 "non-zero remaining payload bytes: " +
633 bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
634 }
635 }
636
637 private static Result<String> decodeString(ByteBuf buffer) {
638 return decodeString(buffer, 0, Integer.MAX_VALUE);
639 }
640
641 private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
642 int size = decodeMsbLsb(buffer);
643 int numberOfBytesConsumed = 2;
644 if (size < minBytes || size > maxBytes) {
645 buffer.skipBytes(size);
646 numberOfBytesConsumed += size;
647 return new Result<String>(null, numberOfBytesConsumed);
648 }
649 String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
650 buffer.skipBytes(size);
651 numberOfBytesConsumed += size;
652 return new Result<String>(s, numberOfBytesConsumed);
653 }
654
655
656
657
658
659 private static byte[] decodeByteArray(ByteBuf buffer) {
660 int size = decodeMsbLsb(buffer);
661 byte[] bytes = new byte[size];
662 buffer.readBytes(bytes);
663 return bytes;
664 }
665
666
667 private static long packInts(int a, int b) {
668 return (((long) a) << 32) | (b & 0xFFFFFFFFL);
669 }
670
671 private static int unpackA(long ints) {
672 return (int) (ints >> 32);
673 }
674
675 private static int unpackB(long ints) {
676 return (int) ints;
677 }
678
679
680
681
682 private static int decodeMsbLsb(ByteBuf buffer) {
683 int min = 0;
684 int max = 65535;
685 short msbSize = buffer.readUnsignedByte();
686 short lsbSize = buffer.readUnsignedByte();
687 int result = msbSize << 8 | lsbSize;
688 if (result < min || result > max) {
689 result = -1;
690 }
691 return result;
692 }
693
694
695
696
697
698
699
700
701 private static long decodeVariableByteInteger(ByteBuf buffer) {
702 int remainingLength = 0;
703 int multiplier = 1;
704 short digit;
705 int loops = 0;
706 do {
707 digit = buffer.readUnsignedByte();
708 remainingLength += (digit & 127) * multiplier;
709 multiplier *= 128;
710 loops++;
711 } while ((digit & 128) != 0 && loops < 4);
712
713 if (loops == 4 && (digit & 128) != 0) {
714 throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
715 }
716 return packInts(remainingLength, loops);
717 }
718
719 private static final class Result<T> {
720
721 private final T value;
722 private final int numberOfBytesConsumed;
723
724 Result(T value, int numberOfBytesConsumed) {
725 this.value = value;
726 this.numberOfBytesConsumed = numberOfBytesConsumed;
727 }
728 }
729
730 private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
731 final long propertiesLength = decodeVariableByteInteger(buffer);
732 int totalPropertiesLength = unpackA(propertiesLength);
733 int numberOfBytesConsumed = unpackB(propertiesLength);
734
735 MqttProperties decodedProperties = new MqttProperties();
736 while (numberOfBytesConsumed < totalPropertiesLength) {
737 long propertyId = decodeVariableByteInteger(buffer);
738 final int propertyIdValue = unpackA(propertyId);
739 numberOfBytesConsumed += unpackB(propertyId);
740 MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
741 switch (propertyType) {
742 case PAYLOAD_FORMAT_INDICATOR:
743 case REQUEST_PROBLEM_INFORMATION:
744 case REQUEST_RESPONSE_INFORMATION:
745 case MAXIMUM_QOS:
746 case RETAIN_AVAILABLE:
747 case WILDCARD_SUBSCRIPTION_AVAILABLE:
748 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
749 case SHARED_SUBSCRIPTION_AVAILABLE:
750 final int b1 = buffer.readUnsignedByte();
751 numberOfBytesConsumed++;
752 decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
753 break;
754 case SERVER_KEEP_ALIVE:
755 case RECEIVE_MAXIMUM:
756 case TOPIC_ALIAS_MAXIMUM:
757 case TOPIC_ALIAS:
758 final int int2BytesResult = decodeMsbLsb(buffer);
759 numberOfBytesConsumed += 2;
760 decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
761 break;
762 case PUBLICATION_EXPIRY_INTERVAL:
763 case SESSION_EXPIRY_INTERVAL:
764 case WILL_DELAY_INTERVAL:
765 case MAXIMUM_PACKET_SIZE:
766 final int maxPacketSize = buffer.readInt();
767 numberOfBytesConsumed += 4;
768 decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
769 break;
770 case SUBSCRIPTION_IDENTIFIER:
771 long vbIntegerResult = decodeVariableByteInteger(buffer);
772 numberOfBytesConsumed += unpackB(vbIntegerResult);
773 decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
774 break;
775 case CONTENT_TYPE:
776 case RESPONSE_TOPIC:
777 case ASSIGNED_CLIENT_IDENTIFIER:
778 case AUTHENTICATION_METHOD:
779 case RESPONSE_INFORMATION:
780 case SERVER_REFERENCE:
781 case REASON_STRING:
782 final Result<String> stringResult = decodeString(buffer);
783 numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
784 decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
785 break;
786 case USER_PROPERTY:
787 final Result<String> keyResult = decodeString(buffer);
788 final Result<String> valueResult = decodeString(buffer);
789 numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
790 numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
791 decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
792 break;
793 case CORRELATION_DATA:
794 case AUTHENTICATION_DATA:
795 final byte[] binaryDataResult = decodeByteArray(buffer);
796 numberOfBytesConsumed += binaryDataResult.length + 2;
797 decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
798 break;
799 default:
800
801 throw new DecoderException("Unknown property type: " + propertyType);
802 }
803 }
804
805 return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
806 }
807 }