1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.ReplayingDecoder;
23 import io.netty.handler.codec.TooLongFrameException;
24 import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25 import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26 import io.netty.util.CharsetUtil;
27 import io.netty.util.Signal;
28 import io.netty.util.internal.ObjectUtil;
29
30 import java.util.ArrayList;
31 import java.util.List;
32
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
34 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
35 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
36 import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
37 import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
38 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
39 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
40 import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
41 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
42 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
43 import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
44 import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
45 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
46 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
47 import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
48 import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
49 import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
50 import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
51 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
52 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
53 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
54 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
55 import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
56 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
57 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
58 import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
59 import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
60 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
61 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
62 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
63 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
64 import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
65 import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
66 import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
67 import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
68
69
70
71
72
73
74
75
76
77 public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
78
79
80
81
82
83
84 enum DecoderState {
85 READ_FIXED_HEADER,
86 READ_VARIABLE_HEADER,
87 READ_PAYLOAD,
88 BAD_MESSAGE,
89 }
90
91 private MqttFixedHeader mqttFixedHeader;
92 private Object variableHeader;
93 private int bytesRemainingInVariablePart;
94
95 private final int maxBytesInMessage;
96 private final int maxClientIdLength;
97
98 public MqttDecoder() {
99 this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
100 }
101
102 public MqttDecoder(int maxBytesInMessage) {
103 this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
104 }
105
106 public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
107 super(DecoderState.READ_FIXED_HEADER);
108 this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
109 this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
110 }
111
112 @Override
113 protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
114 switch (state()) {
115 case READ_FIXED_HEADER: try {
116 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
117 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
118 checkpoint(DecoderState.READ_VARIABLE_HEADER);
119
120 } catch (Exception cause) {
121 out.add(invalidMessage(cause));
122 return;
123 }
124
125 case READ_VARIABLE_HEADER: try {
126 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
127 boolean bailOut = false;
128 try {
129 variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
130 } catch (Signal signal) {
131 if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
132
133
134
135 bailOut = true;
136 } else {
137
138 throw signal;
139 }
140 }
141 if (bailOut || bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
142 buffer.skipBytes(actualReadableBytes());
143 throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
144 + bytesRemainingBeforeVariableHeader);
145 }
146 checkpoint(DecoderState.READ_PAYLOAD);
147
148 } catch (Exception cause) {
149 out.add(invalidMessage(cause));
150 return;
151 }
152
153 case READ_PAYLOAD: try {
154 final Object decodedPayload =
155 decodePayload(
156 ctx,
157 buffer,
158 mqttFixedHeader.messageType(),
159 maxClientIdLength,
160 variableHeader);
161 checkpoint(DecoderState.READ_FIXED_HEADER);
162 MqttMessage message = MqttMessageFactory.newMessage(
163 mqttFixedHeader, variableHeader, decodedPayload);
164 mqttFixedHeader = null;
165 variableHeader = null;
166 out.add(message);
167 break;
168 } catch (Exception cause) {
169 out.add(invalidMessage(cause));
170 return;
171 }
172
173 case BAD_MESSAGE:
174
175 buffer.skipBytes(actualReadableBytes());
176 break;
177
178 default:
179
180 throw new Error("Unexpected mqtt decoder state: " + state());
181 }
182 }
183
184 private MqttMessage invalidMessage(Throwable cause) {
185 checkpoint(DecoderState.BAD_MESSAGE);
186 return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
187 }
188
189
190
191
192
193
194
195
196
197
198
199 private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
200 short b1 = buffer.readUnsignedByte();
201
202 MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
203 boolean dupFlag = (b1 & 0x08) == 0x08;
204 int qosLevel = (b1 & 0x06) >> 1;
205 boolean retain = (b1 & 0x01) != 0;
206
207 switch (messageType) {
208 case PUBLISH:
209 if (qosLevel == 3) {
210 throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
211 + qosLevel + ')');
212 }
213 break;
214
215 case PUBREL:
216 case SUBSCRIBE:
217 case UNSUBSCRIBE:
218 if (dupFlag) {
219 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
220 + " message, must be 0, found 1");
221 }
222 if (qosLevel != 1) {
223 throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
224 + " message, must be 1, found " + qosLevel);
225 }
226 if (retain) {
227 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
228 + " message, must be 0, found 1");
229 }
230 break;
231
232 case AUTH:
233 case CONNACK:
234 case CONNECT:
235 case DISCONNECT:
236 case PINGREQ:
237 case PINGRESP:
238 case PUBACK:
239 case PUBCOMP:
240 case PUBREC:
241 case SUBACK:
242 case UNSUBACK:
243 if (dupFlag) {
244 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
245 + " message, must be 0, found 1");
246 }
247 if (qosLevel != 0) {
248 throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
249 + " message, must be 0, found " + qosLevel);
250 }
251 if (retain) {
252 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
253 + " message, must be 0, found 1");
254 }
255 break;
256 default:
257 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
258 }
259
260 int remainingLength = parseRemainingLength(buffer, messageType);
261 MqttFixedHeader decodedFixedHeader =
262 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
263 return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
264 }
265
266 private static int parseRemainingLength(ByteBuf buffer, MqttMessageType messageType) {
267 int remainingLength = 0;
268 int multiplier = 1;
269
270 for (int i = 0; i < 4; i++) {
271 short digit = buffer.readUnsignedByte();
272 remainingLength += (digit & 127) * multiplier;
273
274 if ((digit & 128) == 0) {
275 return remainingLength;
276 }
277
278 multiplier *= 128;
279 }
280
281
282 throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
283 }
284
285
286
287
288
289
290
291 private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
292 switch (mqttFixedHeader.messageType()) {
293 case CONNECT:
294 return decodeConnectionVariableHeader(ctx, buffer);
295
296 case CONNACK:
297 return decodeConnAckVariableHeader(ctx, buffer);
298
299 case UNSUBSCRIBE:
300 case SUBSCRIBE:
301 case SUBACK:
302 case UNSUBACK:
303 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
304
305 case PUBACK:
306 case PUBREC:
307 case PUBCOMP:
308 case PUBREL:
309 return decodePubReplyMessage(buffer);
310
311 case PUBLISH:
312 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
313
314 case DISCONNECT:
315 case AUTH:
316 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
317
318 case PINGREQ:
319 case PINGRESP:
320
321 return null;
322 default:
323
324 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
325 }
326 }
327
328 private MqttConnectVariableHeader decodeConnectionVariableHeader(
329 ChannelHandlerContext ctx,
330 ByteBuf buffer) {
331 final Result<String> protoString = decodeString(buffer);
332 int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
333
334 final byte protocolLevel = buffer.readByte();
335 numberOfBytesConsumed += 1;
336
337 MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
338 MqttCodecUtil.setMqttVersion(ctx, version);
339
340 final int b1 = buffer.readUnsignedByte();
341 numberOfBytesConsumed += 1;
342
343 final int keepAlive = decodeMsbLsb(buffer);
344 numberOfBytesConsumed += 2;
345
346 final boolean hasUserName = (b1 & 0x80) == 0x80;
347 final boolean hasPassword = (b1 & 0x40) == 0x40;
348 final boolean willRetain = (b1 & 0x20) == 0x20;
349 final int willQos = (b1 & 0x18) >> 3;
350 final boolean willFlag = (b1 & 0x04) == 0x04;
351 final boolean cleanSession = (b1 & 0x02) == 0x02;
352 if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
353 final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
354 if (!zeroReservedFlag) {
355
356
357
358 throw new DecoderException("non-zero reserved flag");
359 }
360 }
361
362 final MqttProperties properties;
363 if (version == MqttVersion.MQTT_5) {
364 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
365 properties = propertiesResult.value;
366 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
367 } else {
368 properties = MqttProperties.NO_PROPERTIES;
369 }
370
371 bytesRemainingInVariablePart -= numberOfBytesConsumed;
372 return new MqttConnectVariableHeader(
373 version.protocolName(),
374 version.protocolLevel(),
375 hasUserName,
376 hasPassword,
377 willRetain,
378 willQos,
379 willFlag,
380 cleanSession,
381 keepAlive,
382 properties);
383 }
384
385 private MqttConnAckVariableHeader decodeConnAckVariableHeader(
386 ChannelHandlerContext ctx,
387 ByteBuf buffer) {
388 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
389 final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
390 byte returnCode = buffer.readByte();
391
392 final MqttProperties properties;
393 if (mqttVersion == MqttVersion.MQTT_5) {
394 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
395 properties = propertiesResult.value;
396 bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
397 } else {
398 properties = MqttProperties.NO_PROPERTIES;
399 bytesRemainingInVariablePart -= 2;
400 }
401
402 return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
403 }
404
405 private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
406 ChannelHandlerContext ctx,
407 ByteBuf buffer) {
408 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
409 final int packetId = decodeMessageId(buffer);
410
411 if (mqttVersion == MqttVersion.MQTT_5) {
412 final Result<MqttProperties> properties = decodeProperties(buffer);
413 bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
414 return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
415 } else {
416 bytesRemainingInVariablePart -= 2;
417 return new MqttMessageIdAndPropertiesVariableHeader(packetId,
418 MqttProperties.NO_PROPERTIES);
419 }
420 }
421
422 private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
423 final int packetId = decodeMessageId(buffer);
424
425 final int packetIdNumberOfBytesConsumed = 2;
426 if (bytesRemainingInVariablePart > 3) {
427 final byte reasonCode = buffer.readByte();
428 final Result<MqttProperties> properties = decodeProperties(buffer);
429 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
430 return new MqttPubReplyMessageVariableHeader(packetId,
431 reasonCode,
432 properties.value);
433 } else if (bytesRemainingInVariablePart > 2) {
434 final byte reasonCode = buffer.readByte();
435 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
436 return new MqttPubReplyMessageVariableHeader(packetId,
437 reasonCode,
438 MqttProperties.NO_PROPERTIES);
439 } else {
440 bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
441 return new MqttPubReplyMessageVariableHeader(packetId,
442 (byte) 0,
443 MqttProperties.NO_PROPERTIES);
444 }
445 }
446
447 private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
448 ByteBuf buffer) {
449 final byte reasonCode;
450 final MqttProperties properties;
451 if (bytesRemainingInVariablePart > 1) {
452 reasonCode = buffer.readByte();
453 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
454 properties = propertiesResult.value;
455 bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
456 } else if (bytesRemainingInVariablePart > 0) {
457 reasonCode = buffer.readByte();
458 properties = MqttProperties.NO_PROPERTIES;
459 --bytesRemainingInVariablePart;
460 } else {
461 reasonCode = 0;
462 properties = MqttProperties.NO_PROPERTIES;
463 }
464
465 return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
466 }
467
468 private MqttPublishVariableHeader decodePublishVariableHeader(
469 ChannelHandlerContext ctx,
470 ByteBuf buffer,
471 MqttFixedHeader mqttFixedHeader) {
472 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
473 final Result<String> decodedTopic = decodeString(buffer);
474 if (!isValidPublishTopicName(decodedTopic.value)) {
475 throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
476 }
477 int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
478
479 int messageId = -1;
480 if (mqttFixedHeader.qosLevel().value() > 0) {
481 messageId = decodeMessageId(buffer);
482 numberOfBytesConsumed += 2;
483 }
484
485 final MqttProperties properties;
486 if (mqttVersion == MqttVersion.MQTT_5) {
487 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
488 properties = propertiesResult.value;
489 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
490 } else {
491 properties = MqttProperties.NO_PROPERTIES;
492 }
493
494 bytesRemainingInVariablePart -= numberOfBytesConsumed;
495 return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
496 }
497
498
499
500
501 private static int decodeMessageId(ByteBuf buffer) {
502 final int messageId = decodeMsbLsb(buffer);
503 if (!isValidMessageId(messageId)) {
504 throw new DecoderException("invalid messageId: " + messageId);
505 }
506 return messageId;
507 }
508
509
510
511
512
513
514
515
516
517 private Object decodePayload(
518 ChannelHandlerContext ctx,
519 ByteBuf buffer,
520 MqttMessageType messageType,
521 int maxClientIdLength,
522 Object variableHeader) {
523 switch (messageType) {
524 case CONNECT:
525 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
526
527 case SUBSCRIBE:
528 return decodeSubscribePayload(buffer);
529
530 case SUBACK:
531 return decodeSubackPayload(buffer);
532
533 case UNSUBSCRIBE:
534 return decodeUnsubscribePayload(buffer);
535
536 case UNSUBACK:
537 return decodeUnsubAckPayload(ctx, buffer);
538
539 case PUBLISH:
540 return decodePublishPayload(buffer);
541
542 default:
543
544
545
546
547 validateNoBytesRemain(0);
548 return null;
549 }
550 }
551
552 private MqttConnectPayload decodeConnectionPayload(
553 ByteBuf buffer,
554 int maxClientIdLength,
555 MqttConnectVariableHeader mqttConnectVariableHeader) {
556 final Result<String> decodedClientId = decodeString(buffer);
557 final String decodedClientIdValue = decodedClientId.value;
558 final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
559 (byte) mqttConnectVariableHeader.version());
560 if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
561 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
562 }
563 int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
564
565 Result<String> decodedWillTopic = null;
566 byte[] decodedWillMessage = null;
567
568 final MqttProperties willProperties;
569 if (mqttConnectVariableHeader.isWillFlag()) {
570 if (mqttVersion == MqttVersion.MQTT_5) {
571 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
572 willProperties = propertiesResult.value;
573 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
574 } else {
575 willProperties = MqttProperties.NO_PROPERTIES;
576 }
577 decodedWillTopic = decodeString(buffer, 0, 32767);
578 numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
579 decodedWillMessage = decodeByteArray(buffer);
580 numberOfBytesConsumed += decodedWillMessage.length + 2;
581 } else {
582 willProperties = MqttProperties.NO_PROPERTIES;
583 }
584 Result<String> decodedUserName = null;
585 byte[] decodedPassword = null;
586 if (mqttConnectVariableHeader.hasUserName()) {
587 decodedUserName = decodeString(buffer);
588 numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
589 }
590 if (mqttConnectVariableHeader.hasPassword()) {
591 decodedPassword = decodeByteArray(buffer);
592 numberOfBytesConsumed += decodedPassword.length + 2;
593 }
594
595 validateNoBytesRemain(numberOfBytesConsumed);
596 return new MqttConnectPayload(
597 decodedClientId.value,
598 willProperties,
599 decodedWillTopic != null ? decodedWillTopic.value : null,
600 decodedWillMessage,
601 decodedUserName != null ? decodedUserName.value : null,
602 decodedPassword);
603 }
604
605 private MqttSubscribePayload decodeSubscribePayload(
606 ByteBuf buffer) {
607 final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
608 int numberOfBytesConsumed = 0;
609 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
610 final Result<String> decodedTopicName = decodeString(buffer);
611 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
612
613 final short optionByte = buffer.readUnsignedByte();
614
615 MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
616 boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
617 boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
618 RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
619
620 final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
621 noLocal,
622 retainAsPublished,
623 retainHandling);
624
625 numberOfBytesConsumed++;
626 subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
627 }
628 validateNoBytesRemain(numberOfBytesConsumed);
629 return new MqttSubscribePayload(subscribeTopics);
630 }
631
632 private MqttSubAckPayload decodeSubackPayload(
633 ByteBuf buffer) {
634 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
635 final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
636 int numberOfBytesConsumed = 0;
637 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
638 int reasonCode = buffer.readUnsignedByte();
639 numberOfBytesConsumed++;
640 grantedQos.add(reasonCode);
641 }
642 validateNoBytesRemain(numberOfBytesConsumed);
643 return new MqttSubAckPayload(grantedQos);
644 }
645
646 private MqttUnsubAckPayload decodeUnsubAckPayload(
647 ChannelHandlerContext ctx,
648 ByteBuf buffer) {
649 int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
650 final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
651 int numberOfBytesConsumed = 0;
652 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
653 short reasonCode = buffer.readUnsignedByte();
654 numberOfBytesConsumed++;
655 reasonCodes.add(reasonCode);
656 }
657 validateNoBytesRemain(numberOfBytesConsumed);
658 return new MqttUnsubAckPayload(reasonCodes);
659 }
660
661 private MqttUnsubscribePayload decodeUnsubscribePayload(
662 ByteBuf buffer) {
663 final List<String> unsubscribeTopics = new ArrayList<String>();
664 int numberOfBytesConsumed = 0;
665 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
666 final Result<String> decodedTopicName = decodeString(buffer);
667 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
668 unsubscribeTopics.add(decodedTopicName.value);
669 }
670 validateNoBytesRemain(numberOfBytesConsumed);
671 return new MqttUnsubscribePayload(unsubscribeTopics);
672 }
673
674 private ByteBuf decodePublishPayload(ByteBuf buffer) {
675 return buffer.readRetainedSlice(bytesRemainingInVariablePart);
676 }
677
678 private void validateNoBytesRemain(int numberOfBytesConsumed) {
679 bytesRemainingInVariablePart -= numberOfBytesConsumed;
680 if (bytesRemainingInVariablePart != 0) {
681 throw new DecoderException(
682 "non-zero remaining payload bytes: " +
683 bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
684 }
685 }
686
687 private static Result<String> decodeString(ByteBuf buffer) {
688 return decodeString(buffer, 0, Integer.MAX_VALUE);
689 }
690
691 private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
692 int size = decodeMsbLsb(buffer);
693 int numberOfBytesConsumed = 2;
694 if (size < minBytes || size > maxBytes) {
695 buffer.skipBytes(size);
696 numberOfBytesConsumed += size;
697 return new Result<String>(null, numberOfBytesConsumed);
698 }
699 String s = buffer.readString(size, CharsetUtil.UTF_8);
700 numberOfBytesConsumed += size;
701 return new Result<String>(s, numberOfBytesConsumed);
702 }
703
704
705
706
707
708 private static byte[] decodeByteArray(ByteBuf buffer) {
709 int size = decodeMsbLsb(buffer);
710 byte[] bytes = new byte[size];
711 buffer.readBytes(bytes);
712 return bytes;
713 }
714
715
716 private static long packInts(int a, int b) {
717 return (((long) a) << 32) | (b & 0xFFFFFFFFL);
718 }
719
720 private static int unpackA(long ints) {
721 return (int) (ints >> 32);
722 }
723
724 private static int unpackB(long ints) {
725 return (int) ints;
726 }
727
728
729
730
731 private static int decodeMsbLsb(ByteBuf buffer) {
732 int min = 0;
733 int max = 65535;
734 short msbSize = buffer.readUnsignedByte();
735 short lsbSize = buffer.readUnsignedByte();
736 int result = msbSize << 8 | lsbSize;
737 if (result < min || result > max) {
738 result = -1;
739 }
740 return result;
741 }
742
743
744
745
746
747
748
749
750 private static long decodeVariableByteInteger(ByteBuf buffer) {
751 int remainingLength = 0;
752 int multiplier = 1;
753
754 for (int i = 0; i < 4; i++) {
755 short digit = buffer.readUnsignedByte();
756 remainingLength += (digit & 127) * multiplier;
757
758 if ((digit & 128) == 0) {
759 return packInts(remainingLength, i + 1);
760 }
761
762 multiplier *= 128;
763 }
764
765 throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
766 }
767
768 private static final class Result<T> {
769
770 private final T value;
771 private final int numberOfBytesConsumed;
772
773 Result(T value, int numberOfBytesConsumed) {
774 this.value = value;
775 this.numberOfBytesConsumed = numberOfBytesConsumed;
776 }
777 }
778
779 private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
780 final long propertiesLength = decodeVariableByteInteger(buffer);
781 int totalPropertiesLength = unpackA(propertiesLength);
782 int numberOfBytesConsumed = unpackB(propertiesLength);
783 if (buffer.readableBytes() < totalPropertiesLength) {
784
785 buffer.readSlice(totalPropertiesLength);
786 }
787
788 MqttProperties decodedProperties = new MqttProperties();
789 while (numberOfBytesConsumed < totalPropertiesLength) {
790 long propertyId = decodeVariableByteInteger(buffer);
791 final int propertyIdValue = unpackA(propertyId);
792 numberOfBytesConsumed += unpackB(propertyId);
793 switch (propertyIdValue) {
794 case PAYLOAD_FORMAT_INDICATOR:
795 case REQUEST_PROBLEM_INFORMATION:
796 case REQUEST_RESPONSE_INFORMATION:
797 case MAXIMUM_QOS:
798 case RETAIN_AVAILABLE:
799 case WILDCARD_SUBSCRIPTION_AVAILABLE:
800 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
801 case SHARED_SUBSCRIPTION_AVAILABLE:
802 final int b1 = buffer.readUnsignedByte();
803 numberOfBytesConsumed++;
804 decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
805 break;
806 case SERVER_KEEP_ALIVE:
807 case RECEIVE_MAXIMUM:
808 case TOPIC_ALIAS_MAXIMUM:
809 case TOPIC_ALIAS:
810 final int int2BytesResult = decodeMsbLsb(buffer);
811 numberOfBytesConsumed += 2;
812 decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
813 break;
814 case PUBLICATION_EXPIRY_INTERVAL:
815 case SESSION_EXPIRY_INTERVAL:
816 case WILL_DELAY_INTERVAL:
817 case MAXIMUM_PACKET_SIZE:
818 final int maxPacketSize = buffer.readInt();
819 numberOfBytesConsumed += 4;
820 decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
821 break;
822 case SUBSCRIPTION_IDENTIFIER:
823 long vbIntegerResult = decodeVariableByteInteger(buffer);
824 numberOfBytesConsumed += unpackB(vbIntegerResult);
825 decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
826 break;
827 case CONTENT_TYPE:
828 case RESPONSE_TOPIC:
829 case ASSIGNED_CLIENT_IDENTIFIER:
830 case AUTHENTICATION_METHOD:
831 case RESPONSE_INFORMATION:
832 case SERVER_REFERENCE:
833 case REASON_STRING:
834 final Result<String> stringResult = decodeString(buffer);
835 numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
836 decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
837 break;
838 case USER_PROPERTY:
839 final Result<String> keyResult = decodeString(buffer);
840 final Result<String> valueResult = decodeString(buffer);
841 numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
842 numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
843 decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
844 break;
845 case CORRELATION_DATA:
846 case AUTHENTICATION_DATA:
847 final byte[] binaryDataResult = decodeByteArray(buffer);
848 numberOfBytesConsumed += binaryDataResult.length + 2;
849 decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
850 break;
851 default:
852
853 throw new DecoderException("Unknown property type: " + propertyIdValue);
854 }
855 }
856
857 return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
858 }
859 }