1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.ReplayingDecoder;
23 import io.netty.handler.codec.TooLongFrameException;
24 import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25 import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26 import io.netty.util.CharsetUtil;
27 import io.netty.util.Signal;
28 import io.netty.util.internal.ObjectUtil;
29
30 import java.util.ArrayList;
31 import java.util.List;
32
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
34 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
35 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
36 import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
37 import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
38 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
39 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
40 import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
41 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
42 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
43 import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
44 import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
45 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
46 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
47 import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
48 import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
49 import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
50 import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
51 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
52 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
53 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
54 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
55 import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
56 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
57 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
58 import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
59 import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
60 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
61 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
62 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
63 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
64 import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
65 import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
66 import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
67 import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
68
69
70
71
72
73
74
75
76
77 public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
78
79
80
81
82
83
84 enum DecoderState {
85 READ_FIXED_HEADER,
86 READ_VARIABLE_HEADER,
87 READ_PAYLOAD,
88 BAD_MESSAGE,
89 }
90
91 private MqttFixedHeader mqttFixedHeader;
92 private Object variableHeader;
93 private int bytesRemainingInVariablePart;
94
95 private final int maxBytesInMessage;
96 private final int maxClientIdLength;
97
98 public MqttDecoder() {
99 this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
100 }
101
102 public MqttDecoder(int maxBytesInMessage) {
103 this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
104 }
105
106 public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
107 super(DecoderState.READ_FIXED_HEADER);
108 this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
109 this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
110 }
111
112 @Override
113 protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
114 switch (state()) {
115 case READ_FIXED_HEADER: try {
116 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
117 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
118 checkpoint(DecoderState.READ_VARIABLE_HEADER);
119
120 } catch (Exception cause) {
121 out.add(invalidMessage(cause));
122 return;
123 }
124
125 case READ_VARIABLE_HEADER: try {
126 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
127 int initialAvailableBytes = buffer.readableBytes();
128 boolean bailOut = false;
129 try {
130 variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
131 } catch (Signal signal) {
132 if (initialAvailableBytes < maxBytesInMessage) {
133
134 throw signal;
135 } else {
136
137
138
139 bailOut = true;
140 }
141 }
142 if (bailOut || bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
143 buffer.skipBytes(actualReadableBytes());
144 throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
145 + bytesRemainingBeforeVariableHeader);
146 }
147 checkpoint(DecoderState.READ_PAYLOAD);
148
149 } catch (Exception cause) {
150 out.add(invalidMessage(cause));
151 return;
152 }
153
154 case READ_PAYLOAD: try {
155 final Object decodedPayload =
156 decodePayload(
157 ctx,
158 buffer,
159 mqttFixedHeader.messageType(),
160 maxClientIdLength,
161 variableHeader);
162 checkpoint(DecoderState.READ_FIXED_HEADER);
163 MqttMessage message = MqttMessageFactory.newMessage(
164 mqttFixedHeader, variableHeader, decodedPayload);
165 mqttFixedHeader = null;
166 variableHeader = null;
167 out.add(message);
168 break;
169 } catch (Exception cause) {
170 out.add(invalidMessage(cause));
171 return;
172 }
173
174 case BAD_MESSAGE:
175
176 buffer.skipBytes(actualReadableBytes());
177 break;
178
179 default:
180
181 throw new Error("Unexpected mqtt decoder state: " + state());
182 }
183 }
184
185 private MqttMessage invalidMessage(Throwable cause) {
186 checkpoint(DecoderState.BAD_MESSAGE);
187 return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
188 }
189
190
191
192
193
194
195
196
197
198
199
200 private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
201 short b1 = buffer.readUnsignedByte();
202
203 MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
204 boolean dupFlag = (b1 & 0x08) == 0x08;
205 int qosLevel = (b1 & 0x06) >> 1;
206 boolean retain = (b1 & 0x01) != 0;
207
208 switch (messageType) {
209 case PUBLISH:
210 if (qosLevel == 3) {
211 throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
212 + qosLevel + ')');
213 }
214 break;
215
216 case PUBREL:
217 case SUBSCRIBE:
218 case UNSUBSCRIBE:
219 if (dupFlag) {
220 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
221 + " message, must be 0, found 1");
222 }
223 if (qosLevel != 1) {
224 throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
225 + " message, must be 1, found " + qosLevel);
226 }
227 if (retain) {
228 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
229 + " message, must be 0, found 1");
230 }
231 break;
232
233 case AUTH:
234 case CONNACK:
235 case CONNECT:
236 case DISCONNECT:
237 case PINGREQ:
238 case PINGRESP:
239 case PUBACK:
240 case PUBCOMP:
241 case PUBREC:
242 case SUBACK:
243 case UNSUBACK:
244 if (dupFlag) {
245 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
246 + " message, must be 0, found 1");
247 }
248 if (qosLevel != 0) {
249 throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
250 + " message, must be 0, found " + qosLevel);
251 }
252 if (retain) {
253 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
254 + " message, must be 0, found 1");
255 }
256 break;
257 default:
258 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
259 }
260
261 int remainingLength = parseRemainingLength(buffer, messageType);
262 MqttFixedHeader decodedFixedHeader =
263 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
264 return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
265 }
266
267 private static int parseRemainingLength(ByteBuf buffer, MqttMessageType messageType) {
268 int remainingLength = 0;
269 int multiplier = 1;
270
271 for (int i = 0; i < 4; i++) {
272 short digit = buffer.readUnsignedByte();
273 remainingLength += (digit & 127) * multiplier;
274
275 if ((digit & 128) == 0) {
276 return remainingLength;
277 }
278
279 multiplier *= 128;
280 }
281
282
283 throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
284 }
285
286
287
288
289
290
291
292 private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
293 switch (mqttFixedHeader.messageType()) {
294 case CONNECT:
295 return decodeConnectionVariableHeader(ctx, buffer);
296
297 case CONNACK:
298 return decodeConnAckVariableHeader(ctx, buffer);
299
300 case UNSUBSCRIBE:
301 case SUBSCRIBE:
302 case SUBACK:
303 case UNSUBACK:
304 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
305
306 case PUBACK:
307 case PUBREC:
308 case PUBCOMP:
309 case PUBREL:
310 return decodePubReplyMessage(buffer);
311
312 case PUBLISH:
313 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
314
315 case DISCONNECT:
316 case AUTH:
317 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
318
319 case PINGREQ:
320 case PINGRESP:
321
322 return null;
323 default:
324
325 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
326 }
327 }
328
329 private MqttConnectVariableHeader decodeConnectionVariableHeader(
330 ChannelHandlerContext ctx,
331 ByteBuf buffer) {
332 final Result<String> protoString = decodeString(buffer);
333 int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
334
335 final byte protocolLevel = buffer.readByte();
336 numberOfBytesConsumed += 1;
337
338 MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
339 MqttCodecUtil.setMqttVersion(ctx, version);
340
341 final int b1 = buffer.readUnsignedByte();
342 numberOfBytesConsumed += 1;
343
344 final int keepAlive = decodeMsbLsb(buffer);
345 numberOfBytesConsumed += 2;
346
347 final boolean hasUserName = (b1 & 0x80) == 0x80;
348 final boolean hasPassword = (b1 & 0x40) == 0x40;
349 final boolean willRetain = (b1 & 0x20) == 0x20;
350 final int willQos = (b1 & 0x18) >> 3;
351 final boolean willFlag = (b1 & 0x04) == 0x04;
352 final boolean cleanSession = (b1 & 0x02) == 0x02;
353 if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
354 final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
355 if (!zeroReservedFlag) {
356
357
358
359 throw new DecoderException("non-zero reserved flag");
360 }
361 }
362
363 final MqttProperties properties;
364 if (version == MqttVersion.MQTT_5) {
365 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
366 properties = propertiesResult.value;
367 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
368 } else {
369 properties = MqttProperties.NO_PROPERTIES;
370 }
371
372 bytesRemainingInVariablePart -= numberOfBytesConsumed;
373 return new MqttConnectVariableHeader(
374 version.protocolName(),
375 version.protocolLevel(),
376 hasUserName,
377 hasPassword,
378 willRetain,
379 willQos,
380 willFlag,
381 cleanSession,
382 keepAlive,
383 properties);
384 }
385
386 private MqttConnAckVariableHeader decodeConnAckVariableHeader(
387 ChannelHandlerContext ctx,
388 ByteBuf buffer) {
389 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
390 final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
391 byte returnCode = buffer.readByte();
392
393 final MqttProperties properties;
394 if (mqttVersion == MqttVersion.MQTT_5) {
395 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
396 properties = propertiesResult.value;
397 bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
398 } else {
399 properties = MqttProperties.NO_PROPERTIES;
400 bytesRemainingInVariablePart -= 2;
401 }
402
403 return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
404 }
405
406 private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
407 ChannelHandlerContext ctx,
408 ByteBuf buffer) {
409 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
410 final int packetId = decodeMessageId(buffer);
411
412 if (mqttVersion == MqttVersion.MQTT_5) {
413 final Result<MqttProperties> properties = decodeProperties(buffer);
414 bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
415 return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
416 } else {
417 bytesRemainingInVariablePart -= 2;
418 return new MqttMessageIdAndPropertiesVariableHeader(packetId,
419 MqttProperties.NO_PROPERTIES);
420 }
421 }
422
423 private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
424 final int packetId = decodeMessageId(buffer);
425
426 final int packetIdNumberOfBytesConsumed = 2;
427 if (bytesRemainingInVariablePart > 3) {
428 final byte reasonCode = buffer.readByte();
429 final Result<MqttProperties> properties = decodeProperties(buffer);
430 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
431 return new MqttPubReplyMessageVariableHeader(packetId,
432 reasonCode,
433 properties.value);
434 } else if (bytesRemainingInVariablePart > 2) {
435 final byte reasonCode = buffer.readByte();
436 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
437 return new MqttPubReplyMessageVariableHeader(packetId,
438 reasonCode,
439 MqttProperties.NO_PROPERTIES);
440 } else {
441 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
442 return new MqttPubReplyMessageVariableHeader(packetId,
443 (byte) 0,
444 MqttProperties.NO_PROPERTIES);
445 }
446 }
447
448 private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
449 ByteBuf buffer) {
450 final byte reasonCode;
451 final MqttProperties properties;
452 if (bytesRemainingInVariablePart > 1) {
453 reasonCode = buffer.readByte();
454 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
455 properties = propertiesResult.value;
456 bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
457 } else if (bytesRemainingInVariablePart > 0) {
458 reasonCode = buffer.readByte();
459 properties = MqttProperties.NO_PROPERTIES;
460 --bytesRemainingInVariablePart;
461 } else {
462 reasonCode = 0;
463 properties = MqttProperties.NO_PROPERTIES;
464 }
465
466 return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
467 }
468
469 private MqttPublishVariableHeader decodePublishVariableHeader(
470 ChannelHandlerContext ctx,
471 ByteBuf buffer,
472 MqttFixedHeader mqttFixedHeader) {
473 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
474 final Result<String> decodedTopic = decodeString(buffer);
475 if (!isValidPublishTopicName(decodedTopic.value)) {
476 throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
477 }
478 int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
479
480 int messageId = -1;
481 if (mqttFixedHeader.qosLevel().value() > 0) {
482 messageId = decodeMessageId(buffer);
483 numberOfBytesConsumed += 2;
484 }
485
486 final MqttProperties properties;
487 if (mqttVersion == MqttVersion.MQTT_5) {
488 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
489 properties = propertiesResult.value;
490 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
491 } else {
492 properties = MqttProperties.NO_PROPERTIES;
493 }
494
495 bytesRemainingInVariablePart -= numberOfBytesConsumed;
496 return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
497 }
498
499
500
501
502 private static int decodeMessageId(ByteBuf buffer) {
503 final int messageId = decodeMsbLsb(buffer);
504 if (!isValidMessageId(messageId)) {
505 throw new DecoderException("invalid messageId: " + messageId);
506 }
507 return messageId;
508 }
509
510
511
512
513
514
515
516
517
518 private Object decodePayload(
519 ChannelHandlerContext ctx,
520 ByteBuf buffer,
521 MqttMessageType messageType,
522 int maxClientIdLength,
523 Object variableHeader) {
524 switch (messageType) {
525 case CONNECT:
526 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
527
528 case SUBSCRIBE:
529 return decodeSubscribePayload(buffer);
530
531 case SUBACK:
532 return decodeSubackPayload(buffer);
533
534 case UNSUBSCRIBE:
535 return decodeUnsubscribePayload(buffer);
536
537 case UNSUBACK:
538 return decodeUnsubAckPayload(ctx, buffer);
539
540 case PUBLISH:
541 return decodePublishPayload(buffer);
542
543 default:
544
545 return null;
546 }
547 }
548
549 private MqttConnectPayload decodeConnectionPayload(
550 ByteBuf buffer,
551 int maxClientIdLength,
552 MqttConnectVariableHeader mqttConnectVariableHeader) {
553 final Result<String> decodedClientId = decodeString(buffer);
554 final String decodedClientIdValue = decodedClientId.value;
555 final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
556 (byte) mqttConnectVariableHeader.version());
557 if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
558 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
559 }
560 int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
561
562 Result<String> decodedWillTopic = null;
563 byte[] decodedWillMessage = null;
564
565 final MqttProperties willProperties;
566 if (mqttConnectVariableHeader.isWillFlag()) {
567 if (mqttVersion == MqttVersion.MQTT_5) {
568 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
569 willProperties = propertiesResult.value;
570 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
571 } else {
572 willProperties = MqttProperties.NO_PROPERTIES;
573 }
574 decodedWillTopic = decodeString(buffer, 0, 32767);
575 numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
576 decodedWillMessage = decodeByteArray(buffer);
577 numberOfBytesConsumed += decodedWillMessage.length + 2;
578 } else {
579 willProperties = MqttProperties.NO_PROPERTIES;
580 }
581 Result<String> decodedUserName = null;
582 byte[] decodedPassword = null;
583 if (mqttConnectVariableHeader.hasUserName()) {
584 decodedUserName = decodeString(buffer);
585 numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
586 }
587 if (mqttConnectVariableHeader.hasPassword()) {
588 decodedPassword = decodeByteArray(buffer);
589 numberOfBytesConsumed += decodedPassword.length + 2;
590 }
591
592 validateNoBytesRemain(numberOfBytesConsumed);
593 return new MqttConnectPayload(
594 decodedClientId.value,
595 willProperties,
596 decodedWillTopic != null ? decodedWillTopic.value : null,
597 decodedWillMessage,
598 decodedUserName != null ? decodedUserName.value : null,
599 decodedPassword);
600 }
601
602 private MqttSubscribePayload decodeSubscribePayload(
603 ByteBuf buffer) {
604 final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
605 int numberOfBytesConsumed = 0;
606 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
607 final Result<String> decodedTopicName = decodeString(buffer);
608 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
609
610 final short optionByte = buffer.readUnsignedByte();
611
612 MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
613 boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
614 boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
615 RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
616
617 final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
618 noLocal,
619 retainAsPublished,
620 retainHandling);
621
622 numberOfBytesConsumed++;
623 subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
624 }
625 validateNoBytesRemain(numberOfBytesConsumed);
626 return new MqttSubscribePayload(subscribeTopics);
627 }
628
629 private MqttSubAckPayload decodeSubackPayload(
630 ByteBuf buffer) {
631 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
632 final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
633 int numberOfBytesConsumed = 0;
634 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
635 int reasonCode = buffer.readUnsignedByte();
636 numberOfBytesConsumed++;
637 grantedQos.add(reasonCode);
638 }
639 validateNoBytesRemain(numberOfBytesConsumed);
640 return new MqttSubAckPayload(grantedQos);
641 }
642
643 private MqttUnsubAckPayload decodeUnsubAckPayload(
644 ChannelHandlerContext ctx,
645 ByteBuf buffer) {
646 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
647 final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
648 int numberOfBytesConsumed = 0;
649 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
650 short reasonCode = buffer.readUnsignedByte();
651 numberOfBytesConsumed++;
652 reasonCodes.add(reasonCode);
653 }
654 validateNoBytesRemain(numberOfBytesConsumed);
655 return new MqttUnsubAckPayload(reasonCodes);
656 }
657
658 private MqttUnsubscribePayload decodeUnsubscribePayload(
659 ByteBuf buffer) {
660 final List<String> unsubscribeTopics = new ArrayList<String>();
661 int numberOfBytesConsumed = 0;
662 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
663 final Result<String> decodedTopicName = decodeString(buffer);
664 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
665 unsubscribeTopics.add(decodedTopicName.value);
666 }
667 validateNoBytesRemain(numberOfBytesConsumed);
668 return new MqttUnsubscribePayload(unsubscribeTopics);
669 }
670
671 private ByteBuf decodePublishPayload(ByteBuf buffer) {
672 return buffer.readRetainedSlice(bytesRemainingInVariablePart);
673 }
674
675 private void validateNoBytesRemain(int numberOfBytesConsumed) {
676 bytesRemainingInVariablePart -= numberOfBytesConsumed;
677 if (bytesRemainingInVariablePart != 0) {
678 throw new DecoderException(
679 "non-zero remaining payload bytes: " +
680 bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
681 }
682 }
683
684 private static Result<String> decodeString(ByteBuf buffer) {
685 return decodeString(buffer, 0, Integer.MAX_VALUE);
686 }
687
688 private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
689 int size = decodeMsbLsb(buffer);
690 int numberOfBytesConsumed = 2;
691 if (size < minBytes || size > maxBytes) {
692 buffer.skipBytes(size);
693 numberOfBytesConsumed += size;
694 return new Result<String>(null, numberOfBytesConsumed);
695 }
696 String s = buffer.readString(size, CharsetUtil.UTF_8);
697 numberOfBytesConsumed += size;
698 return new Result<String>(s, numberOfBytesConsumed);
699 }
700
701
702
703
704
705 private static byte[] decodeByteArray(ByteBuf buffer) {
706 int size = decodeMsbLsb(buffer);
707 byte[] bytes = new byte[size];
708 buffer.readBytes(bytes);
709 return bytes;
710 }
711
712
713 private static long packInts(int a, int b) {
714 return (((long) a) << 32) | (b & 0xFFFFFFFFL);
715 }
716
717 private static int unpackA(long ints) {
718 return (int) (ints >> 32);
719 }
720
721 private static int unpackB(long ints) {
722 return (int) ints;
723 }
724
725
726
727
728 private static int decodeMsbLsb(ByteBuf buffer) {
729 int min = 0;
730 int max = 65535;
731 short msbSize = buffer.readUnsignedByte();
732 short lsbSize = buffer.readUnsignedByte();
733 int result = msbSize << 8 | lsbSize;
734 if (result < min || result > max) {
735 result = -1;
736 }
737 return result;
738 }
739
740
741
742
743
744
745
746
747 private static long decodeVariableByteInteger(ByteBuf buffer) {
748 int remainingLength = 0;
749 int multiplier = 1;
750
751 for (int i = 0; i < 4; i++) {
752 short digit = buffer.readUnsignedByte();
753 remainingLength += (digit & 127) * multiplier;
754
755 if ((digit & 128) == 0) {
756 return packInts(remainingLength, i + 1);
757 }
758
759 multiplier *= 128;
760 }
761
762 throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
763 }
764
765 private static final class Result<T> {
766
767 private final T value;
768 private final int numberOfBytesConsumed;
769
770 Result(T value, int numberOfBytesConsumed) {
771 this.value = value;
772 this.numberOfBytesConsumed = numberOfBytesConsumed;
773 }
774 }
775
776 private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
777 final long propertiesLength = decodeVariableByteInteger(buffer);
778 int totalPropertiesLength = unpackA(propertiesLength);
779 int numberOfBytesConsumed = unpackB(propertiesLength);
780 if (buffer.readableBytes() < totalPropertiesLength) {
781
782 buffer.readSlice(totalPropertiesLength);
783 }
784
785 MqttProperties decodedProperties = new MqttProperties();
786 while (numberOfBytesConsumed < totalPropertiesLength) {
787 long propertyId = decodeVariableByteInteger(buffer);
788 final int propertyIdValue = unpackA(propertyId);
789 numberOfBytesConsumed += unpackB(propertyId);
790 switch (propertyIdValue) {
791 case PAYLOAD_FORMAT_INDICATOR:
792 case REQUEST_PROBLEM_INFORMATION:
793 case REQUEST_RESPONSE_INFORMATION:
794 case MAXIMUM_QOS:
795 case RETAIN_AVAILABLE:
796 case WILDCARD_SUBSCRIPTION_AVAILABLE:
797 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
798 case SHARED_SUBSCRIPTION_AVAILABLE:
799 final int b1 = buffer.readUnsignedByte();
800 numberOfBytesConsumed++;
801 decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
802 break;
803 case SERVER_KEEP_ALIVE:
804 case RECEIVE_MAXIMUM:
805 case TOPIC_ALIAS_MAXIMUM:
806 case TOPIC_ALIAS:
807 final int int2BytesResult = decodeMsbLsb(buffer);
808 numberOfBytesConsumed += 2;
809 decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
810 break;
811 case PUBLICATION_EXPIRY_INTERVAL:
812 case SESSION_EXPIRY_INTERVAL:
813 case WILL_DELAY_INTERVAL:
814 case MAXIMUM_PACKET_SIZE:
815 final int maxPacketSize = buffer.readInt();
816 numberOfBytesConsumed += 4;
817 decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
818 break;
819 case SUBSCRIPTION_IDENTIFIER:
820 long vbIntegerResult = decodeVariableByteInteger(buffer);
821 numberOfBytesConsumed += unpackB(vbIntegerResult);
822 decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
823 break;
824 case CONTENT_TYPE:
825 case RESPONSE_TOPIC:
826 case ASSIGNED_CLIENT_IDENTIFIER:
827 case AUTHENTICATION_METHOD:
828 case RESPONSE_INFORMATION:
829 case SERVER_REFERENCE:
830 case REASON_STRING:
831 final Result<String> stringResult = decodeString(buffer);
832 numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
833 decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
834 break;
835 case USER_PROPERTY:
836 final Result<String> keyResult = decodeString(buffer);
837 final Result<String> valueResult = decodeString(buffer);
838 numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
839 numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
840 decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
841 break;
842 case CORRELATION_DATA:
843 case AUTHENTICATION_DATA:
844 final byte[] binaryDataResult = decodeByteArray(buffer);
845 numberOfBytesConsumed += binaryDataResult.length + 2;
846 decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
847 break;
848 default:
849
850 throw new DecoderException("Unknown property type: " + propertyIdValue);
851 }
852 }
853
854 return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
855 }
856 }