1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.ReplayingDecoder;
23 import io.netty.handler.codec.TooLongFrameException;
24 import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25 import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26 import io.netty.util.CharsetUtil;
27 import io.netty.util.Signal;
28 import io.netty.util.internal.ObjectUtil;
29
30 import java.util.ArrayList;
31 import java.util.List;
32
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
34 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
35 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
36 import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
37 import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
38 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
39 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
40 import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
41
42
43
44
45
46
47
48
49
50 public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
51
52
53
54
55
56
57 enum DecoderState {
58 READ_FIXED_HEADER,
59 READ_VARIABLE_HEADER,
60 READ_PAYLOAD,
61 BAD_MESSAGE,
62 }
63
64 private MqttFixedHeader mqttFixedHeader;
65 private Object variableHeader;
66 private int bytesRemainingInVariablePart;
67
68 private final int maxBytesInMessage;
69 private final int maxClientIdLength;
70
71 public MqttDecoder() {
72 this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
73 }
74
75 public MqttDecoder(int maxBytesInMessage) {
76 this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
77 }
78
79 public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
80 super(DecoderState.READ_FIXED_HEADER);
81 this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
82 this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
83 }
84
85 @Override
86 protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
87 switch (state()) {
88 case READ_FIXED_HEADER: try {
89 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
90 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
91 checkpoint(DecoderState.READ_VARIABLE_HEADER);
92
93 } catch (Exception cause) {
94 out.add(invalidMessage(cause));
95 return;
96 }
97
98 case READ_VARIABLE_HEADER: try {
99 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
100 boolean bailOut = false;
101 try {
102 variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
103 } catch (Signal signal) {
104 if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
105
106
107
108 bailOut = true;
109 } else {
110
111 throw signal;
112 }
113 }
114 if (bailOut || bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
115 buffer.skipBytes(actualReadableBytes());
116 throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
117 + bytesRemainingBeforeVariableHeader);
118 }
119 checkpoint(DecoderState.READ_PAYLOAD);
120
121 } catch (Exception cause) {
122 out.add(invalidMessage(cause));
123 return;
124 }
125
126 case READ_PAYLOAD: try {
127 final Object decodedPayload =
128 decodePayload(
129 ctx,
130 buffer,
131 mqttFixedHeader.messageType(),
132 maxClientIdLength,
133 variableHeader);
134 checkpoint(DecoderState.READ_FIXED_HEADER);
135 MqttMessage message = MqttMessageFactory.newMessage(
136 mqttFixedHeader, variableHeader, decodedPayload);
137 mqttFixedHeader = null;
138 variableHeader = null;
139 out.add(message);
140 break;
141 } catch (Exception cause) {
142 out.add(invalidMessage(cause));
143 return;
144 }
145
146 case BAD_MESSAGE:
147
148 buffer.skipBytes(actualReadableBytes());
149 break;
150
151 default:
152
153 throw new Error();
154 }
155 }
156
157 private MqttMessage invalidMessage(Throwable cause) {
158 checkpoint(DecoderState.BAD_MESSAGE);
159 return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
160 }
161
162
163
164
165
166
167
168
169
170
171
172 private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
173 short b1 = buffer.readUnsignedByte();
174
175 MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
176 boolean dupFlag = (b1 & 0x08) == 0x08;
177 int qosLevel = (b1 & 0x06) >> 1;
178 boolean retain = (b1 & 0x01) != 0;
179
180 switch (messageType) {
181 case PUBLISH:
182 if (qosLevel == 3) {
183 throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
184 + qosLevel + ')');
185 }
186 break;
187
188 case PUBREL:
189 case SUBSCRIBE:
190 case UNSUBSCRIBE:
191 if (dupFlag) {
192 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
193 + " message, must be 0, found 1");
194 }
195 if (qosLevel != 1) {
196 throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
197 + " message, must be 1, found " + qosLevel);
198 }
199 if (retain) {
200 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
201 + " message, must be 0, found 1");
202 }
203 break;
204
205 case AUTH:
206 case CONNACK:
207 case CONNECT:
208 case DISCONNECT:
209 case PINGREQ:
210 case PINGRESP:
211 case PUBACK:
212 case PUBCOMP:
213 case PUBREC:
214 case SUBACK:
215 case UNSUBACK:
216 if (dupFlag) {
217 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
218 + " message, must be 0, found 1");
219 }
220 if (qosLevel != 0) {
221 throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
222 + " message, must be 0, found " + qosLevel);
223 }
224 if (retain) {
225 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
226 + " message, must be 0, found 1");
227 }
228 break;
229 default:
230 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
231 }
232
233 int remainingLength = 0;
234 int multiplier = 1;
235 short digit;
236 int loops = 0;
237 do {
238 digit = buffer.readUnsignedByte();
239 remainingLength += (digit & 127) * multiplier;
240 multiplier *= 128;
241 loops++;
242 } while ((digit & 128) != 0 && loops < 4);
243
244
245 if (loops == 4 && (digit & 128) != 0) {
246 throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
247 }
248 MqttFixedHeader decodedFixedHeader =
249 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
250 return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
251 }
252
253
254
255
256
257
258
259 private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
260 switch (mqttFixedHeader.messageType()) {
261 case CONNECT:
262 return decodeConnectionVariableHeader(ctx, buffer);
263
264 case CONNACK:
265 return decodeConnAckVariableHeader(ctx, buffer);
266
267 case UNSUBSCRIBE:
268 case SUBSCRIBE:
269 case SUBACK:
270 case UNSUBACK:
271 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
272
273 case PUBACK:
274 case PUBREC:
275 case PUBCOMP:
276 case PUBREL:
277 return decodePubReplyMessage(buffer);
278
279 case PUBLISH:
280 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
281
282 case DISCONNECT:
283 case AUTH:
284 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
285
286 case PINGREQ:
287 case PINGRESP:
288
289 return null;
290 default:
291
292 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
293 }
294 }
295
296 private MqttConnectVariableHeader decodeConnectionVariableHeader(
297 ChannelHandlerContext ctx,
298 ByteBuf buffer) {
299 final Result<String> protoString = decodeString(buffer);
300 int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
301
302 final byte protocolLevel = buffer.readByte();
303 numberOfBytesConsumed += 1;
304
305 MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
306 MqttCodecUtil.setMqttVersion(ctx, version);
307
308 final int b1 = buffer.readUnsignedByte();
309 numberOfBytesConsumed += 1;
310
311 final int keepAlive = decodeMsbLsb(buffer);
312 numberOfBytesConsumed += 2;
313
314 final boolean hasUserName = (b1 & 0x80) == 0x80;
315 final boolean hasPassword = (b1 & 0x40) == 0x40;
316 final boolean willRetain = (b1 & 0x20) == 0x20;
317 final int willQos = (b1 & 0x18) >> 3;
318 final boolean willFlag = (b1 & 0x04) == 0x04;
319 final boolean cleanSession = (b1 & 0x02) == 0x02;
320 if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
321 final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
322 if (!zeroReservedFlag) {
323
324
325
326 throw new DecoderException("non-zero reserved flag");
327 }
328 }
329
330 final MqttProperties properties;
331 if (version == MqttVersion.MQTT_5) {
332 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
333 properties = propertiesResult.value;
334 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
335 } else {
336 properties = MqttProperties.NO_PROPERTIES;
337 }
338
339 bytesRemainingInVariablePart -= numberOfBytesConsumed;
340 return new MqttConnectVariableHeader(
341 version.protocolName(),
342 version.protocolLevel(),
343 hasUserName,
344 hasPassword,
345 willRetain,
346 willQos,
347 willFlag,
348 cleanSession,
349 keepAlive,
350 properties);
351 }
352
353 private MqttConnAckVariableHeader decodeConnAckVariableHeader(
354 ChannelHandlerContext ctx,
355 ByteBuf buffer) {
356 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
357 final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
358 byte returnCode = buffer.readByte();
359
360 final MqttProperties properties;
361 if (mqttVersion == MqttVersion.MQTT_5) {
362 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
363 properties = propertiesResult.value;
364 bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
365 } else {
366 properties = MqttProperties.NO_PROPERTIES;
367 bytesRemainingInVariablePart -= 2;
368 }
369
370 return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
371 }
372
373 private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
374 ChannelHandlerContext ctx,
375 ByteBuf buffer) {
376 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
377 final int packetId = decodeMessageId(buffer);
378
379 if (mqttVersion == MqttVersion.MQTT_5) {
380 final Result<MqttProperties> properties = decodeProperties(buffer);
381 bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
382 return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
383 } else {
384 bytesRemainingInVariablePart -= 2;
385 return new MqttMessageIdAndPropertiesVariableHeader(packetId,
386 MqttProperties.NO_PROPERTIES);
387 }
388 }
389
390 private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
391 final int packetId = decodeMessageId(buffer);
392
393 final int packetIdNumberOfBytesConsumed = 2;
394 if (bytesRemainingInVariablePart > 3) {
395 final byte reasonCode = buffer.readByte();
396 final Result<MqttProperties> properties = decodeProperties(buffer);
397 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
398 return new MqttPubReplyMessageVariableHeader(packetId,
399 reasonCode,
400 properties.value);
401 } else if (bytesRemainingInVariablePart > 2) {
402 final byte reasonCode = buffer.readByte();
403 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
404 return new MqttPubReplyMessageVariableHeader(packetId,
405 reasonCode,
406 MqttProperties.NO_PROPERTIES);
407 } else {
408 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
409 return new MqttPubReplyMessageVariableHeader(packetId,
410 (byte) 0,
411 MqttProperties.NO_PROPERTIES);
412 }
413 }
414
415 private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
416 ByteBuf buffer) {
417 final byte reasonCode;
418 final MqttProperties properties;
419 if (bytesRemainingInVariablePart > 1) {
420 reasonCode = buffer.readByte();
421 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
422 properties = propertiesResult.value;
423 bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
424 } else if (bytesRemainingInVariablePart > 0) {
425 reasonCode = buffer.readByte();
426 properties = MqttProperties.NO_PROPERTIES;
427 --bytesRemainingInVariablePart;
428 } else {
429 reasonCode = 0;
430 properties = MqttProperties.NO_PROPERTIES;
431 }
432
433 return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
434 }
435
436 private MqttPublishVariableHeader decodePublishVariableHeader(
437 ChannelHandlerContext ctx,
438 ByteBuf buffer,
439 MqttFixedHeader mqttFixedHeader) {
440 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
441 final Result<String> decodedTopic = decodeString(buffer);
442 if (!isValidPublishTopicName(decodedTopic.value)) {
443 throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
444 }
445 int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
446
447 int messageId = -1;
448 if (mqttFixedHeader.qosLevel().value() > 0) {
449 messageId = decodeMessageId(buffer);
450 numberOfBytesConsumed += 2;
451 }
452
453 final MqttProperties properties;
454 if (mqttVersion == MqttVersion.MQTT_5) {
455 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
456 properties = propertiesResult.value;
457 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
458 } else {
459 properties = MqttProperties.NO_PROPERTIES;
460 }
461
462 bytesRemainingInVariablePart -= numberOfBytesConsumed;
463 return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
464 }
465
466
467
468
469 private static int decodeMessageId(ByteBuf buffer) {
470 final int messageId = decodeMsbLsb(buffer);
471 if (!isValidMessageId(messageId)) {
472 throw new DecoderException("invalid messageId: " + messageId);
473 }
474 return messageId;
475 }
476
477
478
479
480
481
482
483
484
485 private Object decodePayload(
486 ChannelHandlerContext ctx,
487 ByteBuf buffer,
488 MqttMessageType messageType,
489 int maxClientIdLength,
490 Object variableHeader) {
491 switch (messageType) {
492 case CONNECT:
493 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
494
495 case SUBSCRIBE:
496 return decodeSubscribePayload(buffer);
497
498 case SUBACK:
499 return decodeSubackPayload(buffer);
500
501 case UNSUBSCRIBE:
502 return decodeUnsubscribePayload(buffer);
503
504 case UNSUBACK:
505 return decodeUnsubAckPayload(ctx, buffer);
506
507 case PUBLISH:
508 return decodePublishPayload(buffer);
509
510 default:
511
512 return null;
513 }
514 }
515
516 private MqttConnectPayload decodeConnectionPayload(
517 ByteBuf buffer,
518 int maxClientIdLength,
519 MqttConnectVariableHeader mqttConnectVariableHeader) {
520 final Result<String> decodedClientId = decodeString(buffer);
521 final String decodedClientIdValue = decodedClientId.value;
522 final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
523 (byte) mqttConnectVariableHeader.version());
524 if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
525 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
526 }
527 int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
528
529 Result<String> decodedWillTopic = null;
530 byte[] decodedWillMessage = null;
531
532 final MqttProperties willProperties;
533 if (mqttConnectVariableHeader.isWillFlag()) {
534 if (mqttVersion == MqttVersion.MQTT_5) {
535 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
536 willProperties = propertiesResult.value;
537 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
538 } else {
539 willProperties = MqttProperties.NO_PROPERTIES;
540 }
541 decodedWillTopic = decodeString(buffer, 0, 32767);
542 numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
543 decodedWillMessage = decodeByteArray(buffer);
544 numberOfBytesConsumed += decodedWillMessage.length + 2;
545 } else {
546 willProperties = MqttProperties.NO_PROPERTIES;
547 }
548 Result<String> decodedUserName = null;
549 byte[] decodedPassword = null;
550 if (mqttConnectVariableHeader.hasUserName()) {
551 decodedUserName = decodeString(buffer);
552 numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
553 }
554 if (mqttConnectVariableHeader.hasPassword()) {
555 decodedPassword = decodeByteArray(buffer);
556 numberOfBytesConsumed += decodedPassword.length + 2;
557 }
558
559 validateNoBytesRemain(numberOfBytesConsumed);
560 return new MqttConnectPayload(
561 decodedClientId.value,
562 willProperties,
563 decodedWillTopic != null ? decodedWillTopic.value : null,
564 decodedWillMessage,
565 decodedUserName != null ? decodedUserName.value : null,
566 decodedPassword);
567 }
568
569 private MqttSubscribePayload decodeSubscribePayload(
570 ByteBuf buffer) {
571 final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
572 int numberOfBytesConsumed = 0;
573 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
574 final Result<String> decodedTopicName = decodeString(buffer);
575 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
576
577 final short optionByte = buffer.readUnsignedByte();
578
579 MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
580 boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
581 boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
582 RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
583
584 final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
585 noLocal,
586 retainAsPublished,
587 retainHandling);
588
589 numberOfBytesConsumed++;
590 subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
591 }
592 validateNoBytesRemain(numberOfBytesConsumed);
593 return new MqttSubscribePayload(subscribeTopics);
594 }
595
596 private MqttSubAckPayload decodeSubackPayload(
597 ByteBuf buffer) {
598 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
599 final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
600 int numberOfBytesConsumed = 0;
601 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
602 int reasonCode = buffer.readUnsignedByte();
603 numberOfBytesConsumed++;
604 grantedQos.add(reasonCode);
605 }
606 validateNoBytesRemain(numberOfBytesConsumed);
607 return new MqttSubAckPayload(grantedQos);
608 }
609
610 private MqttUnsubAckPayload decodeUnsubAckPayload(
611 ChannelHandlerContext ctx,
612 ByteBuf buffer) {
613 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
614 final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
615 int numberOfBytesConsumed = 0;
616 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
617 short reasonCode = buffer.readUnsignedByte();
618 numberOfBytesConsumed++;
619 reasonCodes.add(reasonCode);
620 }
621 validateNoBytesRemain(numberOfBytesConsumed);
622 return new MqttUnsubAckPayload(reasonCodes);
623 }
624
625 private MqttUnsubscribePayload decodeUnsubscribePayload(
626 ByteBuf buffer) {
627 final List<String> unsubscribeTopics = new ArrayList<String>();
628 int numberOfBytesConsumed = 0;
629 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
630 final Result<String> decodedTopicName = decodeString(buffer);
631 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
632 unsubscribeTopics.add(decodedTopicName.value);
633 }
634 validateNoBytesRemain(numberOfBytesConsumed);
635 return new MqttUnsubscribePayload(unsubscribeTopics);
636 }
637
638 private ByteBuf decodePublishPayload(ByteBuf buffer) {
639 return buffer.readRetainedSlice(bytesRemainingInVariablePart);
640 }
641
642 private void validateNoBytesRemain(int numberOfBytesConsumed) {
643 bytesRemainingInVariablePart -= numberOfBytesConsumed;
644 if (bytesRemainingInVariablePart != 0) {
645 throw new DecoderException(
646 "non-zero remaining payload bytes: " +
647 bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
648 }
649 }
650
651 private static Result<String> decodeString(ByteBuf buffer) {
652 return decodeString(buffer, 0, Integer.MAX_VALUE);
653 }
654
655 private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
656 int size = decodeMsbLsb(buffer);
657 int numberOfBytesConsumed = 2;
658 if (size < minBytes || size > maxBytes) {
659 buffer.skipBytes(size);
660 numberOfBytesConsumed += size;
661 return new Result<String>(null, numberOfBytesConsumed);
662 }
663 String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
664 buffer.skipBytes(size);
665 numberOfBytesConsumed += size;
666 return new Result<String>(s, numberOfBytesConsumed);
667 }
668
669
670
671
672
673 private static byte[] decodeByteArray(ByteBuf buffer) {
674 int size = decodeMsbLsb(buffer);
675 byte[] bytes = new byte[size];
676 buffer.readBytes(bytes);
677 return bytes;
678 }
679
680
681 private static long packInts(int a, int b) {
682 return (((long) a) << 32) | (b & 0xFFFFFFFFL);
683 }
684
685 private static int unpackA(long ints) {
686 return (int) (ints >> 32);
687 }
688
689 private static int unpackB(long ints) {
690 return (int) ints;
691 }
692
693
694
695
696 private static int decodeMsbLsb(ByteBuf buffer) {
697 int min = 0;
698 int max = 65535;
699 short msbSize = buffer.readUnsignedByte();
700 short lsbSize = buffer.readUnsignedByte();
701 int result = msbSize << 8 | lsbSize;
702 if (result < min || result > max) {
703 result = -1;
704 }
705 return result;
706 }
707
708
709
710
711
712
713
714
715 private static long decodeVariableByteInteger(ByteBuf buffer) {
716 int remainingLength = 0;
717 int multiplier = 1;
718 short digit;
719 int loops = 0;
720 do {
721 digit = buffer.readUnsignedByte();
722 remainingLength += (digit & 127) * multiplier;
723 multiplier *= 128;
724 loops++;
725 } while ((digit & 128) != 0 && loops < 4);
726
727 if (loops == 4 && (digit & 128) != 0) {
728 throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
729 }
730 return packInts(remainingLength, loops);
731 }
732
733 private static final class Result<T> {
734
735 private final T value;
736 private final int numberOfBytesConsumed;
737
738 Result(T value, int numberOfBytesConsumed) {
739 this.value = value;
740 this.numberOfBytesConsumed = numberOfBytesConsumed;
741 }
742 }
743
744 private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
745 final long propertiesLength = decodeVariableByteInteger(buffer);
746 int totalPropertiesLength = unpackA(propertiesLength);
747 int numberOfBytesConsumed = unpackB(propertiesLength);
748 if (buffer.readableBytes() < totalPropertiesLength) {
749
750 buffer.readSlice(totalPropertiesLength);
751 }
752
753 MqttProperties decodedProperties = new MqttProperties();
754 while (numberOfBytesConsumed < totalPropertiesLength) {
755 long propertyId = decodeVariableByteInteger(buffer);
756 final int propertyIdValue = unpackA(propertyId);
757 numberOfBytesConsumed += unpackB(propertyId);
758 MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
759 switch (propertyType) {
760 case PAYLOAD_FORMAT_INDICATOR:
761 case REQUEST_PROBLEM_INFORMATION:
762 case REQUEST_RESPONSE_INFORMATION:
763 case MAXIMUM_QOS:
764 case RETAIN_AVAILABLE:
765 case WILDCARD_SUBSCRIPTION_AVAILABLE:
766 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
767 case SHARED_SUBSCRIPTION_AVAILABLE:
768 final int b1 = buffer.readUnsignedByte();
769 numberOfBytesConsumed++;
770 decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
771 break;
772 case SERVER_KEEP_ALIVE:
773 case RECEIVE_MAXIMUM:
774 case TOPIC_ALIAS_MAXIMUM:
775 case TOPIC_ALIAS:
776 final int int2BytesResult = decodeMsbLsb(buffer);
777 numberOfBytesConsumed += 2;
778 decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
779 break;
780 case PUBLICATION_EXPIRY_INTERVAL:
781 case SESSION_EXPIRY_INTERVAL:
782 case WILL_DELAY_INTERVAL:
783 case MAXIMUM_PACKET_SIZE:
784 final int maxPacketSize = buffer.readInt();
785 numberOfBytesConsumed += 4;
786 decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
787 break;
788 case SUBSCRIPTION_IDENTIFIER:
789 long vbIntegerResult = decodeVariableByteInteger(buffer);
790 numberOfBytesConsumed += unpackB(vbIntegerResult);
791 decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
792 break;
793 case CONTENT_TYPE:
794 case RESPONSE_TOPIC:
795 case ASSIGNED_CLIENT_IDENTIFIER:
796 case AUTHENTICATION_METHOD:
797 case RESPONSE_INFORMATION:
798 case SERVER_REFERENCE:
799 case REASON_STRING:
800 final Result<String> stringResult = decodeString(buffer);
801 numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
802 decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
803 break;
804 case USER_PROPERTY:
805 final Result<String> keyResult = decodeString(buffer);
806 final Result<String> valueResult = decodeString(buffer);
807 numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
808 numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
809 decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
810 break;
811 case CORRELATION_DATA:
812 case AUTHENTICATION_DATA:
813 final byte[] binaryDataResult = decodeByteArray(buffer);
814 numberOfBytesConsumed += binaryDataResult.length + 2;
815 decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
816 break;
817 default:
818
819 throw new DecoderException("Unknown property type: " + propertyType);
820 }
821 }
822
823 return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
824 }
825 }