View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.ReplayingDecoder;
23  import io.netty.handler.codec.TooLongFrameException;
24  import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25  import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26  import io.netty.util.CharsetUtil;
27  import io.netty.util.internal.ObjectUtil;
28  
29  import java.util.ArrayList;
30  import java.util.List;
31  
32  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
34  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
35  import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
36  import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
37  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
38  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
39  import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
40  
41  /**
42   * Decodes Mqtt messages from bytes, following
43   * the MQTT protocol specification
44   * <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">v3.1</a>
45   * or
46   * <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">v5.0</a>, depending on the
47   * version specified in the CONNECT message that first goes through the channel.
48   */
49  public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
50  
51      /**
52       * States of the decoder.
53       * We start at READ_FIXED_HEADER, followed by
54       * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
55       */
56      enum DecoderState {
57          READ_FIXED_HEADER,
58          READ_VARIABLE_HEADER,
59          READ_PAYLOAD,
60          BAD_MESSAGE,
61      }
62  
63      private MqttFixedHeader mqttFixedHeader;
64      private Object variableHeader;
65      private int bytesRemainingInVariablePart;
66  
67      private final int maxBytesInMessage;
68      private final int maxClientIdLength;
69  
70      public MqttDecoder() {
71        this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
72      }
73  
74      public MqttDecoder(int maxBytesInMessage) {
75          this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
76      }
77  
78      public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
79          super(DecoderState.READ_FIXED_HEADER);
80          this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
81          this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
82      }
83  
84      @Override
85      protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
86          switch (state()) {
87              case READ_FIXED_HEADER: try {
88                  mqttFixedHeader = decodeFixedHeader(ctx, buffer);
89                  bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
90                  checkpoint(DecoderState.READ_VARIABLE_HEADER);
91                  // fall through
92              } catch (Exception cause) {
93                  out.add(invalidMessage(cause));
94                  return;
95              }
96  
97              case READ_VARIABLE_HEADER:  try {
98                  int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
99                  variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
100                 if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
101                     buffer.skipBytes(actualReadableBytes());
102                     throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
103                             + bytesRemainingBeforeVariableHeader);
104                 }
105                 checkpoint(DecoderState.READ_PAYLOAD);
106                 // fall through
107             } catch (Exception cause) {
108                 out.add(invalidMessage(cause));
109                 return;
110             }
111 
112             case READ_PAYLOAD: try {
113                 final Object decodedPayload =
114                         decodePayload(
115                                 ctx,
116                                 buffer,
117                                 mqttFixedHeader.messageType(),
118                                 maxClientIdLength,
119                                 variableHeader);
120                 checkpoint(DecoderState.READ_FIXED_HEADER);
121                 MqttMessage message = MqttMessageFactory.newMessage(
122                         mqttFixedHeader, variableHeader, decodedPayload);
123                 mqttFixedHeader = null;
124                 variableHeader = null;
125                 out.add(message);
126                 break;
127             } catch (Exception cause) {
128                 out.add(invalidMessage(cause));
129                 return;
130             }
131 
132             case BAD_MESSAGE:
133                 // Keep discarding until disconnection.
134                 buffer.skipBytes(actualReadableBytes());
135                 break;
136 
137             default:
138                 // Shouldn't reach here.
139                 throw new Error();
140         }
141     }
142 
143     private MqttMessage invalidMessage(Throwable cause) {
144       checkpoint(DecoderState.BAD_MESSAGE);
145       return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
146     }
147 
148     /**
149      * Decodes the fixed header. It's one byte for the flags and then variable
150      * bytes for the remaining length.
151      *
152      * @see
153      * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180841
154      *
155      * @param buffer the buffer to decode from
156      * @return the fixed header
157      */
158     private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
159         short b1 = buffer.readUnsignedByte();
160 
161         MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
162         boolean dupFlag = (b1 & 0x08) == 0x08;
163         int qosLevel = (b1 & 0x06) >> 1;
164         boolean retain = (b1 & 0x01) != 0;
165 
166         switch (messageType) {
167             case PUBLISH:
168                 if (qosLevel == 3) {
169                     throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
170                             + qosLevel + ')');
171                 }
172                 break;
173 
174             case PUBREL:
175             case SUBSCRIBE:
176             case UNSUBSCRIBE:
177                 if (dupFlag) {
178                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
179                             + " message, must be 0, found 1");
180                 }
181                 if (qosLevel != 1) {
182                     throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
183                             + " message, must be 1, found " + qosLevel);
184                 }
185                 if (retain) {
186                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
187                             + " message, must be 0, found 1");
188                 }
189                 break;
190 
191             case AUTH:
192             case CONNACK:
193             case CONNECT:
194             case DISCONNECT:
195             case PINGREQ:
196             case PINGRESP:
197             case PUBACK:
198             case PUBCOMP:
199             case PUBREC:
200             case SUBACK:
201             case UNSUBACK:
202                 if (dupFlag) {
203                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
204                             + " message, must be 0, found 1");
205                 }
206                 if (qosLevel != 0) {
207                     throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
208                             + " message, must be 0, found " + qosLevel);
209                 }
210                 if (retain) {
211                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
212                             + " message, must be 0, found 1");
213                 }
214                 break;
215             default:
216                 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
217         }
218 
219         int remainingLength = parseRemainingLength(buffer, messageType);
220         MqttFixedHeader decodedFixedHeader =
221                 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
222         return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
223     }
224 
225     private static int parseRemainingLength(ByteBuf buffer, MqttMessageType messageType) {
226         int remainingLength = 0;
227         int multiplier = 1;
228 
229         for (int i = 0; i < 4; i++) {
230             short digit = buffer.readUnsignedByte();
231             remainingLength += (digit & 127) * multiplier;
232 
233             if ((digit & 128) == 0) {
234                 return remainingLength;
235             }
236 
237             multiplier *= 128;
238         }
239 
240         // MQTT protocol limits Remaining Length to 4 bytes
241         throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
242     }
243 
244     /**
245      * Decodes the variable header (if any)
246      * @param buffer the buffer to decode from
247      * @param mqttFixedHeader MqttFixedHeader of the same message
248      * @return the variable header
249      */
250     private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
251         switch (mqttFixedHeader.messageType()) {
252             case CONNECT:
253                 return decodeConnectionVariableHeader(ctx, buffer);
254 
255             case CONNACK:
256                 return decodeConnAckVariableHeader(ctx, buffer);
257 
258             case UNSUBSCRIBE:
259             case SUBSCRIBE:
260             case SUBACK:
261             case UNSUBACK:
262                 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
263 
264             case PUBACK:
265             case PUBREC:
266             case PUBCOMP:
267             case PUBREL:
268                 return decodePubReplyMessage(buffer);
269 
270             case PUBLISH:
271                 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
272 
273             case DISCONNECT:
274             case AUTH:
275                 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
276 
277             case PINGREQ:
278             case PINGRESP:
279                 // Empty variable header
280                 return null;
281             default:
282                 //shouldn't reach here
283                 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
284         }
285     }
286 
287     private MqttConnectVariableHeader decodeConnectionVariableHeader(
288             ChannelHandlerContext ctx,
289             ByteBuf buffer) {
290         final String protoString = decodeStringAndDecreaseBytesRemaining(buffer);
291 
292         final byte protocolLevel = buffer.readByte();
293         MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString, protocolLevel);
294         MqttCodecUtil.setMqttVersion(ctx, version);
295 
296         final int b1 = buffer.readUnsignedByte();
297         final int keepAlive = decodeMsbLsb(buffer);
298         final boolean hasUserName = (b1 & 0x80) == 0x80;
299         final boolean hasPassword = (b1 & 0x40) == 0x40;
300         final boolean willRetain = (b1 & 0x20) == 0x20;
301         final int willQos = (b1 & 0x18) >> 3;
302         final boolean willFlag = (b1 & 0x04) == 0x04;
303         final boolean cleanSession = (b1 & 0x02) == 0x02;
304         if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
305             final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
306             if (!zeroReservedFlag) {
307                 // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
308                 // set to zero and disconnect the Client if it is not zero.
309                 // See https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
310                 throw new DecoderException("non-zero reserved flag");
311             }
312         }
313 
314         final MqttProperties properties = version == MqttVersion.MQTT_5
315                 ? decodeProperties(buffer)
316                 : MqttProperties.NO_PROPERTIES;
317 
318         bytesRemainingInVariablePart -= 4;
319         return new MqttConnectVariableHeader(
320                 version.protocolName(),
321                 version.protocolLevel(),
322                 hasUserName,
323                 hasPassword,
324                 willRetain,
325                 willQos,
326                 willFlag,
327                 cleanSession,
328                 keepAlive,
329                 properties);
330     }
331 
332     private MqttConnAckVariableHeader decodeConnAckVariableHeader(
333             ChannelHandlerContext ctx,
334             ByteBuf buffer) {
335         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
336         final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
337         byte returnCode = buffer.readByte();
338 
339         bytesRemainingInVariablePart -= 2;
340         final MqttProperties properties = mqttVersion == MqttVersion.MQTT_5
341                 ? decodeProperties(buffer)
342                 : MqttProperties.NO_PROPERTIES;
343 
344         return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
345     }
346 
347     private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
348             ChannelHandlerContext ctx,
349             ByteBuf buffer) {
350         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
351         final int packetId = decodeMessageId(buffer);
352 
353         bytesRemainingInVariablePart -= 2;
354         MqttProperties properties = mqttVersion == MqttVersion.MQTT_5
355                 ? decodeProperties(buffer)
356                 : MqttProperties.NO_PROPERTIES;
357         return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties);
358     }
359 
360     private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
361         final int packetId = decodeMessageId(buffer);
362 
363         final int packetIdNumberOfBytesConsumed = 2;
364         if (bytesRemainingInVariablePart > 3) {
365             final byte reasonCode = buffer.readByte();
366             final MqttProperties properties = decodeProperties(buffer);
367             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
368             return new MqttPubReplyMessageVariableHeader(packetId,
369                     reasonCode,
370                     properties);
371         } else if (bytesRemainingInVariablePart > 2) {
372             final byte reasonCode = buffer.readByte();
373             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
374             return new MqttPubReplyMessageVariableHeader(packetId,
375                     reasonCode,
376                     MqttProperties.NO_PROPERTIES);
377         } else {
378             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
379             return new MqttPubReplyMessageVariableHeader(packetId,
380                     (byte) 0,
381                     MqttProperties.NO_PROPERTIES);
382         }
383     }
384 
385     private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
386             ByteBuf buffer) {
387         final byte reasonCode;
388         final MqttProperties properties;
389         if (bytesRemainingInVariablePart > 1) {
390             reasonCode = buffer.readByte();
391             properties = decodeProperties(buffer);
392             --bytesRemainingInVariablePart;
393         } else if (bytesRemainingInVariablePart > 0) {
394             reasonCode = buffer.readByte();
395             properties = MqttProperties.NO_PROPERTIES;
396             --bytesRemainingInVariablePart;
397         } else {
398             reasonCode = 0;
399             properties = MqttProperties.NO_PROPERTIES;
400         }
401 
402         return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
403     }
404 
405     private MqttPublishVariableHeader decodePublishVariableHeader(
406             ChannelHandlerContext ctx,
407             ByteBuf buffer,
408             MqttFixedHeader mqttFixedHeader) {
409         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
410         final String decodedTopic = decodeStringAndDecreaseBytesRemaining(buffer);
411         if (!isValidPublishTopicName(decodedTopic)) {
412             throw new DecoderException("invalid publish topic name: " + decodedTopic + " (contains wildcards)");
413         }
414 
415         int messageId = -1;
416         if (mqttFixedHeader.qosLevel().value() > 0) {
417             messageId = decodeMessageId(buffer);
418             bytesRemainingInVariablePart -= 2;
419         }
420 
421         final MqttProperties properties = mqttVersion == MqttVersion.MQTT_5
422                 ? decodeProperties(buffer)
423                 : MqttProperties.NO_PROPERTIES;
424 
425         return new MqttPublishVariableHeader(decodedTopic, messageId, properties);
426     }
427 
428     /**
429      * @return messageId with numberOfBytesConsumed is 2
430      */
431     private static int decodeMessageId(ByteBuf buffer) {
432         final int messageId = decodeMsbLsb(buffer);
433         if (!isValidMessageId(messageId)) {
434             throw new DecoderException("invalid messageId: " + messageId);
435         }
436         return messageId;
437     }
438 
439     /**
440      * Decodes the payload.
441      *
442      * @param buffer the buffer to decode from
443      * @param messageType  type of the message being decoded
444      * @param variableHeader variable header of the same message
445      * @return the payload
446      */
447     private Object decodePayload(
448             ChannelHandlerContext ctx,
449             ByteBuf buffer,
450             MqttMessageType messageType,
451             int maxClientIdLength,
452             Object variableHeader) {
453         switch (messageType) {
454             case CONNECT:
455                 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
456 
457             case SUBSCRIBE:
458                 return decodeSubscribePayload(buffer);
459 
460             case SUBACK:
461                 return decodeSubackPayload(buffer);
462 
463             case UNSUBSCRIBE:
464                 return decodeUnsubscribePayload(buffer);
465 
466             case UNSUBACK:
467                 return decodeUnsubAckPayload(ctx, buffer);
468 
469             case PUBLISH:
470                 return decodePublishPayload(buffer);
471 
472             default:
473                 // unknown payload , no byte consumed
474                 return null;
475         }
476     }
477 
478     private MqttConnectPayload decodeConnectionPayload(
479             ByteBuf buffer,
480             int maxClientIdLength,
481             MqttConnectVariableHeader mqttConnectVariableHeader) {
482         String decodedClientId = decodeStringAndDecreaseBytesRemaining(buffer);
483         final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
484                 (byte) mqttConnectVariableHeader.version());
485         if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientId)) {
486             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientId);
487         }
488 
489         String decodedWillTopic = null;
490         byte[] decodedWillMessage = null;
491 
492         int numberOfBytesConsumed = 0;
493         final MqttProperties willProperties;
494         if (mqttConnectVariableHeader.isWillFlag()) {
495             if (mqttVersion == MqttVersion.MQTT_5) {
496                 willProperties = decodeProperties(buffer);
497             } else {
498                 willProperties = MqttProperties.NO_PROPERTIES;
499             }
500 
501             int willTopicSize = decodeMsbLsb(buffer);
502             numberOfBytesConsumed += 2 + willTopicSize;
503             if (willTopicSize <= 32767) {
504                 decodedWillTopic = buffer.toString(buffer.readerIndex(), willTopicSize, CharsetUtil.UTF_8);
505             }
506             buffer.skipBytes(willTopicSize);
507 
508             decodedWillMessage = decodeByteArray(buffer);
509             numberOfBytesConsumed += decodedWillMessage.length + 2;
510         } else {
511             willProperties = MqttProperties.NO_PROPERTIES;
512         }
513         String decodedUserName = null;
514         byte[] decodedPassword = null;
515         if (mqttConnectVariableHeader.hasUserName()) {
516             decodedUserName = decodeStringAndDecreaseBytesRemaining(buffer);
517         }
518         if (mqttConnectVariableHeader.hasPassword()) {
519             decodedPassword = decodeByteArray(buffer);
520             numberOfBytesConsumed += decodedPassword.length + 2;
521         }
522 
523         validateNoBytesRemain(numberOfBytesConsumed);
524         return new MqttConnectPayload(
525                         decodedClientId,
526                         willProperties,
527                         decodedWillTopic,
528                         decodedWillMessage,
529                         decodedUserName,
530                         decodedPassword);
531     }
532 
533     private MqttSubscribePayload decodeSubscribePayload(
534             ByteBuf buffer) {
535         final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
536         int numberOfBytesConsumed = 0;
537         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
538             int topicNameSize = decodeMsbLsb(buffer);
539             String decodedTopicName = buffer.toString(buffer.readerIndex(), topicNameSize, CharsetUtil.UTF_8);
540             buffer.skipBytes(topicNameSize);
541             numberOfBytesConsumed += 2 + topicNameSize;
542 
543             //See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details
544             final short optionByte = buffer.readUnsignedByte();
545 
546             MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
547             boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
548             boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
549             RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
550 
551             final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
552                     noLocal,
553                     retainAsPublished,
554                     retainHandling);
555 
556             numberOfBytesConsumed++;
557             subscribeTopics.add(new MqttTopicSubscription(decodedTopicName, subscriptionOption));
558         }
559         validateNoBytesRemain(numberOfBytesConsumed);
560         return new MqttSubscribePayload(subscribeTopics);
561     }
562 
563     private MqttSubAckPayload decodeSubackPayload(
564             ByteBuf buffer) {
565         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
566         final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
567         int numberOfBytesConsumed = 0;
568         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
569             int reasonCode = buffer.readUnsignedByte();
570             numberOfBytesConsumed++;
571             grantedQos.add(reasonCode);
572         }
573         validateNoBytesRemain(numberOfBytesConsumed);
574         return new MqttSubAckPayload(grantedQos);
575     }
576 
577     private MqttUnsubAckPayload decodeUnsubAckPayload(
578         ChannelHandlerContext ctx,
579         ByteBuf buffer) {
580         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
581         final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
582         int numberOfBytesConsumed = 0;
583         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
584             short reasonCode = buffer.readUnsignedByte();
585             numberOfBytesConsumed++;
586             reasonCodes.add(reasonCode);
587         }
588         validateNoBytesRemain(numberOfBytesConsumed);
589         return new MqttUnsubAckPayload(reasonCodes);
590     }
591 
592     private MqttUnsubscribePayload decodeUnsubscribePayload(
593             ByteBuf buffer) {
594         final List<String> unsubscribeTopics = new ArrayList<String>();
595         while (bytesRemainingInVariablePart > 0) {
596             final String decodedTopicName = decodeStringAndDecreaseBytesRemaining(buffer);
597             unsubscribeTopics.add(decodedTopicName);
598         }
599         return new MqttUnsubscribePayload(unsubscribeTopics);
600     }
601 
602     private ByteBuf decodePublishPayload(ByteBuf buffer) {
603         return buffer.readRetainedSlice(bytesRemainingInVariablePart);
604     }
605 
606     private void validateNoBytesRemain(int numberOfBytesConsumed) {
607         bytesRemainingInVariablePart -= numberOfBytesConsumed;
608         if (bytesRemainingInVariablePart != 0) {
609             throw new DecoderException(
610                     "non-zero remaining payload bytes: " +
611                     bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
612         }
613     }
614 
615     private String decodeStringAndDecreaseBytesRemaining(ByteBuf buffer) {
616         int size = decodeMsbLsb(buffer);
617         bytesRemainingInVariablePart -= 2 + size;
618         String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
619         buffer.skipBytes(size);
620         return s;
621     }
622 
623     /**
624      *
625      * @return the decoded byte[], numberOfBytesConsumed = byte[].length + 2
626      */
627     private static byte[] decodeByteArray(ByteBuf buffer) {
628         int size = decodeMsbLsb(buffer);
629         byte[] bytes = new byte[size];
630         buffer.readBytes(bytes);
631         return bytes;
632     }
633 
634     // packing utils to reduce the amount of garbage while decoding ints
635     private static long packInts(int a, int b) {
636         return (((long) a) << 32) | (b & 0xFFFFFFFFL);
637     }
638 
639     private static int unpackA(long ints) {
640         return (int) (ints >> 32);
641     }
642 
643     private static int unpackB(long ints) {
644         return (int) ints;
645     }
646 
647     /**
648      *  numberOfBytesConsumed = 2. return decoded result.
649      */
650     private static int decodeMsbLsb(ByteBuf buffer) {
651         int min = 0;
652         int max = 65535;
653         short msbSize = buffer.readUnsignedByte();
654         short lsbSize = buffer.readUnsignedByte();
655         int result = msbSize << 8 | lsbSize;
656         if (result < min || result > max) {
657             result = -1;
658         }
659         return result;
660     }
661 
662     /**
663      * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
664      *
665      * @param buffer the buffer to decode from
666      * @return result pack with a = decoded integer, b = numberOfBytesConsumed. Need to unpack to read them.
667      * @throws DecoderException if bad MQTT protocol limits Remaining Length
668      */
669     private static long decodeVariableByteInteger(ByteBuf buffer) {
670         int remainingLength = 0;
671         int multiplier = 1;
672 
673         for (int i = 0; i < 4; i++) {
674             short digit = buffer.readUnsignedByte();
675             remainingLength += (digit & 127) * multiplier;
676 
677             if ((digit & 128) == 0) {
678                 return packInts(remainingLength, i + 1);
679             }
680 
681             multiplier *= 128;
682         }
683 
684         throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
685     }
686 
687     private MqttProperties decodeProperties(ByteBuf buffer) {
688         final long propertiesLength = decodeVariableByteInteger(buffer);
689         int totalPropertiesLength = unpackA(propertiesLength);
690         int numberOfBytesConsumed = unpackB(propertiesLength);
691 
692         MqttProperties decodedProperties = new MqttProperties();
693         while (numberOfBytesConsumed < totalPropertiesLength) {
694             long propertyId = decodeVariableByteInteger(buffer);
695             final int propertyIdValue = unpackA(propertyId);
696             numberOfBytesConsumed += unpackB(propertyId);
697             MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
698             switch (propertyType) {
699                 case PAYLOAD_FORMAT_INDICATOR:
700                 case REQUEST_PROBLEM_INFORMATION:
701                 case REQUEST_RESPONSE_INFORMATION:
702                 case MAXIMUM_QOS:
703                 case RETAIN_AVAILABLE:
704                 case WILDCARD_SUBSCRIPTION_AVAILABLE:
705                 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
706                 case SHARED_SUBSCRIPTION_AVAILABLE:
707                     final int b1 = buffer.readUnsignedByte();
708                     numberOfBytesConsumed++;
709                     decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
710                     break;
711                 case SERVER_KEEP_ALIVE:
712                 case RECEIVE_MAXIMUM:
713                 case TOPIC_ALIAS_MAXIMUM:
714                 case TOPIC_ALIAS:
715                     final int int2BytesResult = decodeMsbLsb(buffer);
716                     numberOfBytesConsumed += 2;
717                     decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
718                     break;
719                 case PUBLICATION_EXPIRY_INTERVAL:
720                 case SESSION_EXPIRY_INTERVAL:
721                 case WILL_DELAY_INTERVAL:
722                 case MAXIMUM_PACKET_SIZE:
723                     final int maxPacketSize = buffer.readInt();
724                     numberOfBytesConsumed += 4;
725                     decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
726                     break;
727                 case SUBSCRIPTION_IDENTIFIER:
728                     long vbIntegerResult = decodeVariableByteInteger(buffer);
729                     numberOfBytesConsumed += unpackB(vbIntegerResult);
730                     decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
731                     break;
732                 case CONTENT_TYPE:
733                 case RESPONSE_TOPIC:
734                 case ASSIGNED_CLIENT_IDENTIFIER:
735                 case AUTHENTICATION_METHOD:
736                 case RESPONSE_INFORMATION:
737                 case SERVER_REFERENCE:
738                 case REASON_STRING:
739                     int size = decodeMsbLsb(buffer);
740                     numberOfBytesConsumed += 2 + size;
741                     String string = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
742                     buffer.skipBytes(size);
743 
744                     decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, string));
745                     break;
746                 case USER_PROPERTY:
747                     int keySize = decodeMsbLsb(buffer);
748                     String key = buffer.toString(buffer.readerIndex(), keySize, CharsetUtil.UTF_8);
749                     buffer.skipBytes(keySize);
750 
751                     int valueSize = decodeMsbLsb(buffer);
752                     String value = buffer.toString(buffer.readerIndex(), valueSize, CharsetUtil.UTF_8);
753                     buffer.skipBytes(valueSize);
754 
755                     numberOfBytesConsumed += 4 + keySize + valueSize;
756                     decodedProperties.add(new MqttProperties.UserProperty(key, value));
757                     break;
758                 case CORRELATION_DATA:
759                 case AUTHENTICATION_DATA:
760                     final byte[] binaryDataResult = decodeByteArray(buffer);
761                     numberOfBytesConsumed += binaryDataResult.length + 2;
762                     decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
763                     break;
764                 default:
765                     //shouldn't reach here
766                     throw new DecoderException("Unknown property type: " + propertyType);
767             }
768         }
769 
770         bytesRemainingInVariablePart -= numberOfBytesConsumed;
771         return decodedProperties;
772     }
773 }