1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.ReplayingDecoder;
23 import io.netty.handler.codec.TooLongFrameException;
24 import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25 import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26 import io.netty.util.CharsetUtil;
27 import io.netty.util.internal.ObjectUtil;
28
29 import java.util.ArrayList;
30 import java.util.List;
31
32 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
34 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
35 import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
36 import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
37 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
38 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
39 import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
40 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
41 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
42 import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
43 import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
44 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
45 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
46 import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
47 import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
48 import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
49 import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
50 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
51 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
52 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
53 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
54 import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
55 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
56 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
57 import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
58 import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
59 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
60 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
61 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
62 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
63 import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
64 import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
65 import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
66 import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
67
68
69
70
71
72
73
74
75
76 public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
77
78
79
80
81
82
83 enum DecoderState {
84 READ_FIXED_HEADER,
85 READ_VARIABLE_HEADER,
86 READ_PAYLOAD,
87 BAD_MESSAGE,
88 }
89
90 private MqttFixedHeader mqttFixedHeader;
91 private Object variableHeader;
92 private int bytesRemainingInVariablePart;
93
94 private final int maxBytesInMessage;
95 private final int maxClientIdLength;
96
97 public MqttDecoder() {
98 this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
99 }
100
101 public MqttDecoder(int maxBytesInMessage) {
102 this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
103 }
104
105 public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
106 super(DecoderState.READ_FIXED_HEADER);
107 this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
108 this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
109 }
110
111 @Override
112 protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
113 switch (state()) {
114 case READ_FIXED_HEADER: try {
115 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
116 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
117 checkpoint(DecoderState.READ_VARIABLE_HEADER);
118
119 } catch (Exception cause) {
120 out.add(invalidMessage(cause));
121 return;
122 }
123
124 case READ_VARIABLE_HEADER: try {
125 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
126 variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
127 if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
128 buffer.skipBytes(actualReadableBytes());
129 throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
130 + bytesRemainingBeforeVariableHeader);
131 }
132 checkpoint(DecoderState.READ_PAYLOAD);
133
134 } catch (Exception cause) {
135 out.add(invalidMessage(cause));
136 return;
137 }
138
139 case READ_PAYLOAD: try {
140 final Object decodedPayload =
141 decodePayload(
142 ctx,
143 buffer,
144 mqttFixedHeader.messageType(),
145 maxClientIdLength,
146 variableHeader);
147 checkpoint(DecoderState.READ_FIXED_HEADER);
148 MqttMessage message = MqttMessageFactory.newMessage(
149 mqttFixedHeader, variableHeader, decodedPayload);
150 mqttFixedHeader = null;
151 variableHeader = null;
152 out.add(message);
153 break;
154 } catch (Exception cause) {
155 out.add(invalidMessage(cause));
156 return;
157 }
158
159 case BAD_MESSAGE:
160
161 buffer.skipBytes(actualReadableBytes());
162 break;
163
164 default:
165
166 throw new Error();
167 }
168 }
169
170 private MqttMessage invalidMessage(Throwable cause) {
171 checkpoint(DecoderState.BAD_MESSAGE);
172 return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
173 }
174
175
176
177
178
179
180
181
182
183
184
185 private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
186 short b1 = buffer.readUnsignedByte();
187
188 MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
189 boolean dupFlag = (b1 & 0x08) == 0x08;
190 int qosLevel = (b1 & 0x06) >> 1;
191 boolean retain = (b1 & 0x01) != 0;
192
193 switch (messageType) {
194 case PUBLISH:
195 if (qosLevel == 3) {
196 throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
197 + qosLevel + ')');
198 }
199 break;
200
201 case PUBREL:
202 case SUBSCRIBE:
203 case UNSUBSCRIBE:
204 if (dupFlag) {
205 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
206 + " message, must be 0, found 1");
207 }
208 if (qosLevel != 1) {
209 throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
210 + " message, must be 1, found " + qosLevel);
211 }
212 if (retain) {
213 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
214 + " message, must be 0, found 1");
215 }
216 break;
217
218 case AUTH:
219 case CONNACK:
220 case CONNECT:
221 case DISCONNECT:
222 case PINGREQ:
223 case PINGRESP:
224 case PUBACK:
225 case PUBCOMP:
226 case PUBREC:
227 case SUBACK:
228 case UNSUBACK:
229 if (dupFlag) {
230 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
231 + " message, must be 0, found 1");
232 }
233 if (qosLevel != 0) {
234 throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
235 + " message, must be 0, found " + qosLevel);
236 }
237 if (retain) {
238 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
239 + " message, must be 0, found 1");
240 }
241 break;
242 default:
243 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
244 }
245
246 int remainingLength = parseRemainingLength(buffer, messageType);
247 MqttFixedHeader decodedFixedHeader =
248 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
249 return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
250 }
251
252 private static int parseRemainingLength(ByteBuf buffer, MqttMessageType messageType) {
253 int remainingLength = 0;
254 int multiplier = 1;
255
256 for (int i = 0; i < 4; i++) {
257 short digit = buffer.readUnsignedByte();
258 remainingLength += (digit & 127) * multiplier;
259
260 if ((digit & 128) == 0) {
261 return remainingLength;
262 }
263
264 multiplier *= 128;
265 }
266
267
268 throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
269 }
270
271
272
273
274
275
276
277 private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
278 switch (mqttFixedHeader.messageType()) {
279 case CONNECT:
280 return decodeConnectionVariableHeader(ctx, buffer);
281
282 case CONNACK:
283 return decodeConnAckVariableHeader(ctx, buffer);
284
285 case UNSUBSCRIBE:
286 case SUBSCRIBE:
287 case SUBACK:
288 case UNSUBACK:
289 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
290
291 case PUBACK:
292 case PUBREC:
293 case PUBCOMP:
294 case PUBREL:
295 return decodePubReplyMessage(buffer);
296
297 case PUBLISH:
298 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
299
300 case DISCONNECT:
301 case AUTH:
302 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
303
304 case PINGREQ:
305 case PINGRESP:
306
307 return null;
308 default:
309
310 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
311 }
312 }
313
314 private MqttConnectVariableHeader decodeConnectionVariableHeader(
315 ChannelHandlerContext ctx,
316 ByteBuf buffer) {
317 final Result<String> protoString = decodeString(buffer);
318 int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
319
320 final byte protocolLevel = buffer.readByte();
321 numberOfBytesConsumed += 1;
322
323 MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
324 MqttCodecUtil.setMqttVersion(ctx, version);
325
326 final int b1 = buffer.readUnsignedByte();
327 numberOfBytesConsumed += 1;
328
329 final int keepAlive = decodeMsbLsb(buffer);
330 numberOfBytesConsumed += 2;
331
332 final boolean hasUserName = (b1 & 0x80) == 0x80;
333 final boolean hasPassword = (b1 & 0x40) == 0x40;
334 final boolean willRetain = (b1 & 0x20) == 0x20;
335 final int willQos = (b1 & 0x18) >> 3;
336 final boolean willFlag = (b1 & 0x04) == 0x04;
337 final boolean cleanSession = (b1 & 0x02) == 0x02;
338 if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
339 final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
340 if (!zeroReservedFlag) {
341
342
343
344 throw new DecoderException("non-zero reserved flag");
345 }
346 }
347
348 final MqttProperties properties;
349 if (version == MqttVersion.MQTT_5) {
350 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
351 properties = propertiesResult.value;
352 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
353 } else {
354 properties = MqttProperties.NO_PROPERTIES;
355 }
356
357 bytesRemainingInVariablePart -= numberOfBytesConsumed;
358 return new MqttConnectVariableHeader(
359 version.protocolName(),
360 version.protocolLevel(),
361 hasUserName,
362 hasPassword,
363 willRetain,
364 willQos,
365 willFlag,
366 cleanSession,
367 keepAlive,
368 properties);
369 }
370
371 private MqttConnAckVariableHeader decodeConnAckVariableHeader(
372 ChannelHandlerContext ctx,
373 ByteBuf buffer) {
374 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
375 final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
376 byte returnCode = buffer.readByte();
377
378 final MqttProperties properties;
379 if (mqttVersion == MqttVersion.MQTT_5) {
380 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
381 properties = propertiesResult.value;
382 bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
383 } else {
384 properties = MqttProperties.NO_PROPERTIES;
385 bytesRemainingInVariablePart -= 2;
386 }
387
388 return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
389 }
390
391 private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
392 ChannelHandlerContext ctx,
393 ByteBuf buffer) {
394 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
395 final int packetId = decodeMessageId(buffer);
396
397 if (mqttVersion == MqttVersion.MQTT_5) {
398 final Result<MqttProperties> properties = decodeProperties(buffer);
399 bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
400 return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
401 } else {
402 bytesRemainingInVariablePart -= 2;
403 return new MqttMessageIdAndPropertiesVariableHeader(packetId,
404 MqttProperties.NO_PROPERTIES);
405 }
406 }
407
408 private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
409 final int packetId = decodeMessageId(buffer);
410
411 final int packetIdNumberOfBytesConsumed = 2;
412 if (bytesRemainingInVariablePart > 3) {
413 final byte reasonCode = buffer.readByte();
414 final Result<MqttProperties> properties = decodeProperties(buffer);
415 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
416 return new MqttPubReplyMessageVariableHeader(packetId,
417 reasonCode,
418 properties.value);
419 } else if (bytesRemainingInVariablePart > 2) {
420 final byte reasonCode = buffer.readByte();
421 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
422 return new MqttPubReplyMessageVariableHeader(packetId,
423 reasonCode,
424 MqttProperties.NO_PROPERTIES);
425 } else {
426 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
427 return new MqttPubReplyMessageVariableHeader(packetId,
428 (byte) 0,
429 MqttProperties.NO_PROPERTIES);
430 }
431 }
432
433 private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
434 ByteBuf buffer) {
435 final byte reasonCode;
436 final MqttProperties properties;
437 if (bytesRemainingInVariablePart > 1) {
438 reasonCode = buffer.readByte();
439 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
440 properties = propertiesResult.value;
441 bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
442 } else if (bytesRemainingInVariablePart > 0) {
443 reasonCode = buffer.readByte();
444 properties = MqttProperties.NO_PROPERTIES;
445 --bytesRemainingInVariablePart;
446 } else {
447 reasonCode = 0;
448 properties = MqttProperties.NO_PROPERTIES;
449 }
450
451 return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
452 }
453
454 private MqttPublishVariableHeader decodePublishVariableHeader(
455 ChannelHandlerContext ctx,
456 ByteBuf buffer,
457 MqttFixedHeader mqttFixedHeader) {
458 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
459 final Result<String> decodedTopic = decodeString(buffer);
460 if (!isValidPublishTopicName(decodedTopic.value)) {
461 throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
462 }
463 int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
464
465 int messageId = -1;
466 if (mqttFixedHeader.qosLevel().value() > 0) {
467 messageId = decodeMessageId(buffer);
468 numberOfBytesConsumed += 2;
469 }
470
471 final MqttProperties properties;
472 if (mqttVersion == MqttVersion.MQTT_5) {
473 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
474 properties = propertiesResult.value;
475 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
476 } else {
477 properties = MqttProperties.NO_PROPERTIES;
478 }
479
480 bytesRemainingInVariablePart -= numberOfBytesConsumed;
481 return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
482 }
483
484
485
486
487 private static int decodeMessageId(ByteBuf buffer) {
488 final int messageId = decodeMsbLsb(buffer);
489 if (!isValidMessageId(messageId)) {
490 throw new DecoderException("invalid messageId: " + messageId);
491 }
492 return messageId;
493 }
494
495
496
497
498
499
500
501
502
503 private Object decodePayload(
504 ChannelHandlerContext ctx,
505 ByteBuf buffer,
506 MqttMessageType messageType,
507 int maxClientIdLength,
508 Object variableHeader) {
509 switch (messageType) {
510 case CONNECT:
511 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
512
513 case SUBSCRIBE:
514 return decodeSubscribePayload(buffer);
515
516 case SUBACK:
517 return decodeSubackPayload(buffer);
518
519 case UNSUBSCRIBE:
520 return decodeUnsubscribePayload(buffer);
521
522 case UNSUBACK:
523 return decodeUnsubAckPayload(ctx, buffer);
524
525 case PUBLISH:
526 return decodePublishPayload(buffer);
527
528 default:
529
530 return null;
531 }
532 }
533
534 private MqttConnectPayload decodeConnectionPayload(
535 ByteBuf buffer,
536 int maxClientIdLength,
537 MqttConnectVariableHeader mqttConnectVariableHeader) {
538 final Result<String> decodedClientId = decodeString(buffer);
539 final String decodedClientIdValue = decodedClientId.value;
540 final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
541 (byte) mqttConnectVariableHeader.version());
542 if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
543 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
544 }
545 int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
546
547 Result<String> decodedWillTopic = null;
548 byte[] decodedWillMessage = null;
549
550 final MqttProperties willProperties;
551 if (mqttConnectVariableHeader.isWillFlag()) {
552 if (mqttVersion == MqttVersion.MQTT_5) {
553 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
554 willProperties = propertiesResult.value;
555 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
556 } else {
557 willProperties = MqttProperties.NO_PROPERTIES;
558 }
559 decodedWillTopic = decodeString(buffer, 0, 32767);
560 numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
561 decodedWillMessage = decodeByteArray(buffer);
562 numberOfBytesConsumed += decodedWillMessage.length + 2;
563 } else {
564 willProperties = MqttProperties.NO_PROPERTIES;
565 }
566 Result<String> decodedUserName = null;
567 byte[] decodedPassword = null;
568 if (mqttConnectVariableHeader.hasUserName()) {
569 decodedUserName = decodeString(buffer);
570 numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
571 }
572 if (mqttConnectVariableHeader.hasPassword()) {
573 decodedPassword = decodeByteArray(buffer);
574 numberOfBytesConsumed += decodedPassword.length + 2;
575 }
576
577 validateNoBytesRemain(numberOfBytesConsumed);
578 return new MqttConnectPayload(
579 decodedClientId.value,
580 willProperties,
581 decodedWillTopic != null ? decodedWillTopic.value : null,
582 decodedWillMessage,
583 decodedUserName != null ? decodedUserName.value : null,
584 decodedPassword);
585 }
586
587 private MqttSubscribePayload decodeSubscribePayload(
588 ByteBuf buffer) {
589 final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
590 int numberOfBytesConsumed = 0;
591 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
592 final Result<String> decodedTopicName = decodeString(buffer);
593 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
594
595 final short optionByte = buffer.readUnsignedByte();
596
597 MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
598 boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
599 boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
600 RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
601
602 final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
603 noLocal,
604 retainAsPublished,
605 retainHandling);
606
607 numberOfBytesConsumed++;
608 subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
609 }
610 validateNoBytesRemain(numberOfBytesConsumed);
611 return new MqttSubscribePayload(subscribeTopics);
612 }
613
614 private MqttSubAckPayload decodeSubackPayload(
615 ByteBuf buffer) {
616 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
617 final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
618 int numberOfBytesConsumed = 0;
619 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
620 int reasonCode = buffer.readUnsignedByte();
621 numberOfBytesConsumed++;
622 grantedQos.add(reasonCode);
623 }
624 validateNoBytesRemain(numberOfBytesConsumed);
625 return new MqttSubAckPayload(grantedQos);
626 }
627
628 private MqttUnsubAckPayload decodeUnsubAckPayload(
629 ChannelHandlerContext ctx,
630 ByteBuf buffer) {
631 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
632 final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
633 int numberOfBytesConsumed = 0;
634 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
635 short reasonCode = buffer.readUnsignedByte();
636 numberOfBytesConsumed++;
637 reasonCodes.add(reasonCode);
638 }
639 validateNoBytesRemain(numberOfBytesConsumed);
640 return new MqttUnsubAckPayload(reasonCodes);
641 }
642
643 private MqttUnsubscribePayload decodeUnsubscribePayload(
644 ByteBuf buffer) {
645 final List<String> unsubscribeTopics = new ArrayList<String>();
646 int numberOfBytesConsumed = 0;
647 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
648 final Result<String> decodedTopicName = decodeString(buffer);
649 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
650 unsubscribeTopics.add(decodedTopicName.value);
651 }
652 validateNoBytesRemain(numberOfBytesConsumed);
653 return new MqttUnsubscribePayload(unsubscribeTopics);
654 }
655
656 private ByteBuf decodePublishPayload(ByteBuf buffer) {
657 return buffer.readRetainedSlice(bytesRemainingInVariablePart);
658 }
659
660 private void validateNoBytesRemain(int numberOfBytesConsumed) {
661 bytesRemainingInVariablePart -= numberOfBytesConsumed;
662 if (bytesRemainingInVariablePart != 0) {
663 throw new DecoderException(
664 "non-zero remaining payload bytes: " +
665 bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
666 }
667 }
668
669 private static Result<String> decodeString(ByteBuf buffer) {
670 return decodeString(buffer, 0, Integer.MAX_VALUE);
671 }
672
673 private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
674 int size = decodeMsbLsb(buffer);
675 int numberOfBytesConsumed = 2;
676 if (size < minBytes || size > maxBytes) {
677 buffer.skipBytes(size);
678 numberOfBytesConsumed += size;
679 return new Result<String>(null, numberOfBytesConsumed);
680 }
681 String s = buffer.readString(size, CharsetUtil.UTF_8);
682 numberOfBytesConsumed += size;
683 return new Result<String>(s, numberOfBytesConsumed);
684 }
685
686
687
688
689
690 private static byte[] decodeByteArray(ByteBuf buffer) {
691 int size = decodeMsbLsb(buffer);
692 byte[] bytes = new byte[size];
693 buffer.readBytes(bytes);
694 return bytes;
695 }
696
697
698 private static long packInts(int a, int b) {
699 return (((long) a) << 32) | (b & 0xFFFFFFFFL);
700 }
701
702 private static int unpackA(long ints) {
703 return (int) (ints >> 32);
704 }
705
706 private static int unpackB(long ints) {
707 return (int) ints;
708 }
709
710
711
712
713 private static int decodeMsbLsb(ByteBuf buffer) {
714 int min = 0;
715 int max = 65535;
716 short msbSize = buffer.readUnsignedByte();
717 short lsbSize = buffer.readUnsignedByte();
718 int result = msbSize << 8 | lsbSize;
719 if (result < min || result > max) {
720 result = -1;
721 }
722 return result;
723 }
724
725
726
727
728
729
730
731
732 private static long decodeVariableByteInteger(ByteBuf buffer) {
733 int remainingLength = 0;
734 int multiplier = 1;
735
736 for (int i = 0; i < 4; i++) {
737 short digit = buffer.readUnsignedByte();
738 remainingLength += (digit & 127) * multiplier;
739
740 if ((digit & 128) == 0) {
741 return packInts(remainingLength, i + 1);
742 }
743
744 multiplier *= 128;
745 }
746
747 throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
748 }
749
750 private static final class Result<T> {
751
752 private final T value;
753 private final int numberOfBytesConsumed;
754
755 Result(T value, int numberOfBytesConsumed) {
756 this.value = value;
757 this.numberOfBytesConsumed = numberOfBytesConsumed;
758 }
759 }
760
761 private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
762 final long propertiesLength = decodeVariableByteInteger(buffer);
763 int totalPropertiesLength = unpackA(propertiesLength);
764 int numberOfBytesConsumed = unpackB(propertiesLength);
765
766 MqttProperties decodedProperties = new MqttProperties();
767 while (numberOfBytesConsumed < totalPropertiesLength) {
768 long propertyId = decodeVariableByteInteger(buffer);
769 final int propertyIdValue = unpackA(propertyId);
770 numberOfBytesConsumed += unpackB(propertyId);
771 switch (propertyIdValue) {
772 case PAYLOAD_FORMAT_INDICATOR:
773 case REQUEST_PROBLEM_INFORMATION:
774 case REQUEST_RESPONSE_INFORMATION:
775 case MAXIMUM_QOS:
776 case RETAIN_AVAILABLE:
777 case WILDCARD_SUBSCRIPTION_AVAILABLE:
778 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
779 case SHARED_SUBSCRIPTION_AVAILABLE:
780 final int b1 = buffer.readUnsignedByte();
781 numberOfBytesConsumed++;
782 decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
783 break;
784 case SERVER_KEEP_ALIVE:
785 case RECEIVE_MAXIMUM:
786 case TOPIC_ALIAS_MAXIMUM:
787 case TOPIC_ALIAS:
788 final int int2BytesResult = decodeMsbLsb(buffer);
789 numberOfBytesConsumed += 2;
790 decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
791 break;
792 case PUBLICATION_EXPIRY_INTERVAL:
793 case SESSION_EXPIRY_INTERVAL:
794 case WILL_DELAY_INTERVAL:
795 case MAXIMUM_PACKET_SIZE:
796 final int maxPacketSize = buffer.readInt();
797 numberOfBytesConsumed += 4;
798 decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
799 break;
800 case SUBSCRIPTION_IDENTIFIER:
801 long vbIntegerResult = decodeVariableByteInteger(buffer);
802 numberOfBytesConsumed += unpackB(vbIntegerResult);
803 decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
804 break;
805 case CONTENT_TYPE:
806 case RESPONSE_TOPIC:
807 case ASSIGNED_CLIENT_IDENTIFIER:
808 case AUTHENTICATION_METHOD:
809 case RESPONSE_INFORMATION:
810 case SERVER_REFERENCE:
811 case REASON_STRING:
812 final Result<String> stringResult = decodeString(buffer);
813 numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
814 decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
815 break;
816 case USER_PROPERTY:
817 final Result<String> keyResult = decodeString(buffer);
818 final Result<String> valueResult = decodeString(buffer);
819 numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
820 numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
821 decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
822 break;
823 case CORRELATION_DATA:
824 case AUTHENTICATION_DATA:
825 final byte[] binaryDataResult = decodeByteArray(buffer);
826 numberOfBytesConsumed += binaryDataResult.length + 2;
827 decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
828 break;
829 default:
830
831 throw new DecoderException("Unknown property type: " + propertyIdValue);
832 }
833 }
834
835 return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
836 }
837 }