View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.ReplayingDecoder;
23  import io.netty.handler.codec.TooLongFrameException;
24  import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25  import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26  import io.netty.util.CharsetUtil;
27  import io.netty.util.internal.ObjectUtil;
28  
29  import java.util.ArrayList;
30  import java.util.List;
31  
32  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
34  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
35  import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
36  import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
37  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
38  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
39  import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
40  
41  /**
42   * Decodes Mqtt messages from bytes, following
43   * the MQTT protocol specification
44   * <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">v3.1</a>
45   * or
46   * <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">v5.0</a>, depending on the
47   * version specified in the CONNECT message that first goes through the channel.
48   */
49  public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
50  
51      /**
52       * States of the decoder.
53       * We start at READ_FIXED_HEADER, followed by
54       * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
55       */
56      enum DecoderState {
57          READ_FIXED_HEADER,
58          READ_VARIABLE_HEADER,
59          READ_PAYLOAD,
60          BAD_MESSAGE,
61      }
62  
63      private MqttFixedHeader mqttFixedHeader;
64      private Object variableHeader;
65      private int bytesRemainingInVariablePart;
66  
67      private final int maxBytesInMessage;
68      private final int maxClientIdLength;
69  
70      public MqttDecoder() {
71        this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
72      }
73  
74      public MqttDecoder(int maxBytesInMessage) {
75          this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
76      }
77  
78      public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
79          super(DecoderState.READ_FIXED_HEADER);
80          this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
81          this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
82      }
83  
84      @Override
85      protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
86          switch (state()) {
87              case READ_FIXED_HEADER: try {
88                  mqttFixedHeader = decodeFixedHeader(ctx, buffer);
89                  bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
90                  checkpoint(DecoderState.READ_VARIABLE_HEADER);
91                  // fall through
92              } catch (Exception cause) {
93                  out.add(invalidMessage(cause));
94                  return;
95              }
96  
97              case READ_VARIABLE_HEADER:  try {
98                  final Result<?> decodedVariableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
99                  variableHeader = decodedVariableHeader.value;
100                 if (bytesRemainingInVariablePart > maxBytesInMessage) {
101                     buffer.skipBytes(actualReadableBytes());
102                     throw new TooLongFrameException("too large message: " + bytesRemainingInVariablePart + " bytes");
103                 }
104                 bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
105                 checkpoint(DecoderState.READ_PAYLOAD);
106                 // fall through
107             } catch (Exception cause) {
108                 out.add(invalidMessage(cause));
109                 return;
110             }
111 
112             case READ_PAYLOAD: try {
113                 final Result<?> decodedPayload =
114                         decodePayload(
115                                 ctx,
116                                 buffer,
117                                 mqttFixedHeader.messageType(),
118                                 bytesRemainingInVariablePart,
119                                 maxClientIdLength,
120                                 variableHeader);
121                 bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
122                 if (bytesRemainingInVariablePart != 0) {
123                     throw new DecoderException(
124                             "non-zero remaining payload bytes: " +
125                                     bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
126                 }
127                 checkpoint(DecoderState.READ_FIXED_HEADER);
128                 MqttMessage message = MqttMessageFactory.newMessage(
129                         mqttFixedHeader, variableHeader, decodedPayload.value);
130                 mqttFixedHeader = null;
131                 variableHeader = null;
132                 out.add(message);
133                 break;
134             } catch (Exception cause) {
135                 out.add(invalidMessage(cause));
136                 return;
137             }
138 
139             case BAD_MESSAGE:
140                 // Keep discarding until disconnection.
141                 buffer.skipBytes(actualReadableBytes());
142                 break;
143 
144             default:
145                 // Shouldn't reach here.
146                 throw new Error();
147         }
148     }
149 
150     private MqttMessage invalidMessage(Throwable cause) {
151       checkpoint(DecoderState.BAD_MESSAGE);
152       return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
153     }
154 
155     /**
156      * Decodes the fixed header. It's one byte for the flags and then variable
157      * bytes for the remaining length.
158      *
159      * @see
160      * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180841
161      *
162      * @param buffer the buffer to decode from
163      * @return the fixed header
164      */
165     private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
166         short b1 = buffer.readUnsignedByte();
167 
168         MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
169         boolean dupFlag = (b1 & 0x08) == 0x08;
170         int qosLevel = (b1 & 0x06) >> 1;
171         boolean retain = (b1 & 0x01) != 0;
172 
173         switch (messageType) {
174             case PUBLISH:
175                 if (qosLevel == 3) {
176                     throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
177                             + qosLevel + ')');
178                 }
179                 break;
180 
181             case PUBREL:
182             case SUBSCRIBE:
183             case UNSUBSCRIBE:
184                 if (dupFlag) {
185                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
186                             + " message, must be 0, found 1");
187                 }
188                 if (qosLevel != 1) {
189                     throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
190                             + " message, must be 1, found " + qosLevel);
191                 }
192                 if (retain) {
193                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
194                             + " message, must be 0, found 1");
195                 }
196                 break;
197 
198             case AUTH:
199             case CONNACK:
200             case CONNECT:
201             case DISCONNECT:
202             case PINGREQ:
203             case PINGRESP:
204             case PUBACK:
205             case PUBCOMP:
206             case PUBREC:
207             case SUBACK:
208             case UNSUBACK:
209                 if (dupFlag) {
210                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
211                             + " message, must be 0, found 1");
212                 }
213                 if (qosLevel != 0) {
214                     throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
215                             + " message, must be 0, found " + qosLevel);
216                 }
217                 if (retain) {
218                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
219                             + " message, must be 0, found 1");
220                 }
221                 break;
222             default:
223                 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
224         }
225 
226         int remainingLength = 0;
227         int multiplier = 1;
228         short digit;
229         int loops = 0;
230         do {
231             digit = buffer.readUnsignedByte();
232             remainingLength += (digit & 127) * multiplier;
233             multiplier *= 128;
234             loops++;
235         } while ((digit & 128) != 0 && loops < 4);
236 
237         // MQTT protocol limits Remaining Length to 4 bytes
238         if (loops == 4 && (digit & 128) != 0) {
239             throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
240         }
241         MqttFixedHeader decodedFixedHeader =
242                 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
243         return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
244     }
245 
246     /**
247      * Decodes the variable header (if any)
248      * @param buffer the buffer to decode from
249      * @param mqttFixedHeader MqttFixedHeader of the same message
250      * @return the variable header
251      */
252     private Result<?> decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
253         switch (mqttFixedHeader.messageType()) {
254             case CONNECT:
255                 return decodeConnectionVariableHeader(ctx, buffer);
256 
257             case CONNACK:
258                 return decodeConnAckVariableHeader(ctx, buffer);
259 
260             case UNSUBSCRIBE:
261             case SUBSCRIBE:
262             case SUBACK:
263             case UNSUBACK:
264                 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
265 
266             case PUBACK:
267             case PUBREC:
268             case PUBCOMP:
269             case PUBREL:
270                 return decodePubReplyMessage(buffer);
271 
272             case PUBLISH:
273                 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
274 
275             case DISCONNECT:
276             case AUTH:
277                 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
278 
279             case PINGREQ:
280             case PINGRESP:
281                 // Empty variable header
282                 return new Result<Object>(null, 0);
283             default:
284                 //shouldn't reach here
285                 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
286         }
287     }
288 
289     private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(
290             ChannelHandlerContext ctx,
291             ByteBuf buffer) {
292         final Result<String> protoString = decodeString(buffer);
293         int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
294 
295         final byte protocolLevel = buffer.readByte();
296         numberOfBytesConsumed += 1;
297 
298         MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
299         MqttCodecUtil.setMqttVersion(ctx, version);
300 
301         final int b1 = buffer.readUnsignedByte();
302         numberOfBytesConsumed += 1;
303 
304         final int keepAlive = decodeMsbLsb(buffer);
305         numberOfBytesConsumed += 2;
306 
307         final boolean hasUserName = (b1 & 0x80) == 0x80;
308         final boolean hasPassword = (b1 & 0x40) == 0x40;
309         final boolean willRetain = (b1 & 0x20) == 0x20;
310         final int willQos = (b1 & 0x18) >> 3;
311         final boolean willFlag = (b1 & 0x04) == 0x04;
312         final boolean cleanSession = (b1 & 0x02) == 0x02;
313         if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
314             final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
315             if (!zeroReservedFlag) {
316                 // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
317                 // set to zero and disconnect the Client if it is not zero.
318                 // See https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
319                 throw new DecoderException("non-zero reserved flag");
320             }
321         }
322 
323         final MqttProperties properties;
324         if (version == MqttVersion.MQTT_5) {
325             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
326             properties = propertiesResult.value;
327             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
328         } else {
329             properties = MqttProperties.NO_PROPERTIES;
330         }
331 
332         final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
333                 version.protocolName(),
334                 version.protocolLevel(),
335                 hasUserName,
336                 hasPassword,
337                 willRetain,
338                 willQos,
339                 willFlag,
340                 cleanSession,
341                 keepAlive,
342                 properties);
343         return new Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);
344     }
345 
346     private static Result<MqttConnAckVariableHeader> decodeConnAckVariableHeader(
347             ChannelHandlerContext ctx,
348             ByteBuf buffer) {
349         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
350         final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
351         byte returnCode = buffer.readByte();
352         int numberOfBytesConsumed = 2;
353 
354         final MqttProperties properties;
355         if (mqttVersion == MqttVersion.MQTT_5) {
356             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
357             properties = propertiesResult.value;
358             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
359         } else {
360             properties = MqttProperties.NO_PROPERTIES;
361         }
362 
363         final MqttConnAckVariableHeader mqttConnAckVariableHeader =
364                 new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
365         return new Result<MqttConnAckVariableHeader>(mqttConnAckVariableHeader, numberOfBytesConsumed);
366     }
367 
368     private static Result<MqttMessageIdAndPropertiesVariableHeader> decodeMessageIdAndPropertiesVariableHeader(
369             ChannelHandlerContext ctx,
370             ByteBuf buffer) {
371         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
372         final int packetId = decodeMessageId(buffer);
373 
374         final MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader;
375         final int mqtt5Consumed;
376 
377         if (mqttVersion == MqttVersion.MQTT_5) {
378             final Result<MqttProperties> properties = decodeProperties(buffer);
379             mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
380             mqtt5Consumed = properties.numberOfBytesConsumed;
381         } else {
382             mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId,
383                     MqttProperties.NO_PROPERTIES);
384             mqtt5Consumed = 0;
385         }
386 
387         return new Result<MqttMessageIdAndPropertiesVariableHeader>(mqttVariableHeader,
388                 2 + mqtt5Consumed);
389     }
390 
391     private Result<MqttPubReplyMessageVariableHeader> decodePubReplyMessage(ByteBuf buffer) {
392         final int packetId = decodeMessageId(buffer);
393 
394         final MqttPubReplyMessageVariableHeader mqttPubAckVariableHeader;
395         final int consumed;
396         final int packetIdNumberOfBytesConsumed = 2;
397         if (bytesRemainingInVariablePart > 3) {
398             final byte reasonCode = buffer.readByte();
399             final Result<MqttProperties> properties = decodeProperties(buffer);
400             mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId,
401                     reasonCode,
402                     properties.value);
403             consumed = packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
404         } else if (bytesRemainingInVariablePart > 2) {
405             final byte reasonCode = buffer.readByte();
406             mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId,
407                     reasonCode,
408                     MqttProperties.NO_PROPERTIES);
409             consumed = packetIdNumberOfBytesConsumed + 1;
410         } else {
411             mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId,
412                     (byte) 0,
413                     MqttProperties.NO_PROPERTIES);
414             consumed = packetIdNumberOfBytesConsumed;
415         }
416 
417         return new Result<MqttPubReplyMessageVariableHeader>(mqttPubAckVariableHeader, consumed);
418     }
419 
420     private Result<MqttReasonCodeAndPropertiesVariableHeader> decodeReasonCodeAndPropertiesVariableHeader(
421             ByteBuf buffer) {
422         final byte reasonCode;
423         final MqttProperties properties;
424         final int consumed;
425         if (bytesRemainingInVariablePart > 1) {
426             reasonCode = buffer.readByte();
427             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
428             properties = propertiesResult.value;
429             consumed = 1 + propertiesResult.numberOfBytesConsumed;
430         } else if (bytesRemainingInVariablePart > 0) {
431             reasonCode = buffer.readByte();
432             properties = MqttProperties.NO_PROPERTIES;
433             consumed = 1;
434         } else {
435             reasonCode = 0;
436             properties = MqttProperties.NO_PROPERTIES;
437             consumed = 0;
438         }
439         final MqttReasonCodeAndPropertiesVariableHeader mqttReasonAndPropsVariableHeader =
440                 new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
441 
442         return new Result<MqttReasonCodeAndPropertiesVariableHeader>(
443                 mqttReasonAndPropsVariableHeader,
444                 consumed);
445     }
446 
447     private Result<MqttPublishVariableHeader> decodePublishVariableHeader(
448             ChannelHandlerContext ctx,
449             ByteBuf buffer,
450             MqttFixedHeader mqttFixedHeader) {
451         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
452         final Result<String> decodedTopic = decodeString(buffer);
453         if (!isValidPublishTopicName(decodedTopic.value)) {
454             throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
455         }
456         int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
457 
458         int messageId = -1;
459         if (mqttFixedHeader.qosLevel().value() > 0) {
460             messageId = decodeMessageId(buffer);
461             numberOfBytesConsumed += 2;
462         }
463 
464         final MqttProperties properties;
465         if (mqttVersion == MqttVersion.MQTT_5) {
466             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
467             properties = propertiesResult.value;
468             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
469         } else {
470             properties = MqttProperties.NO_PROPERTIES;
471         }
472 
473         final MqttPublishVariableHeader mqttPublishVariableHeader =
474                 new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
475         return new Result<MqttPublishVariableHeader>(mqttPublishVariableHeader, numberOfBytesConsumed);
476     }
477 
478     /**
479      * @return messageId with numberOfBytesConsumed is 2
480      */
481     private static int decodeMessageId(ByteBuf buffer) {
482         final int messageId = decodeMsbLsb(buffer);
483         if (!isValidMessageId(messageId)) {
484             throw new DecoderException("invalid messageId: " + messageId);
485         }
486         return messageId;
487     }
488 
489     /**
490      * Decodes the payload.
491      *
492      * @param buffer the buffer to decode from
493      * @param messageType  type of the message being decoded
494      * @param bytesRemainingInVariablePart bytes remaining
495      * @param variableHeader variable header of the same message
496      * @return the payload
497      */
498     private static Result<?> decodePayload(
499             ChannelHandlerContext ctx,
500             ByteBuf buffer,
501             MqttMessageType messageType,
502             int bytesRemainingInVariablePart,
503             int maxClientIdLength,
504             Object variableHeader) {
505         switch (messageType) {
506             case CONNECT:
507                 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
508 
509             case SUBSCRIBE:
510                 return decodeSubscribePayload(buffer, bytesRemainingInVariablePart);
511 
512             case SUBACK:
513                 return decodeSubackPayload(buffer, bytesRemainingInVariablePart);
514 
515             case UNSUBSCRIBE:
516                 return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart);
517 
518             case UNSUBACK:
519                 return decodeUnsubAckPayload(ctx, buffer, bytesRemainingInVariablePart);
520 
521             case PUBLISH:
522                 return decodePublishPayload(buffer, bytesRemainingInVariablePart);
523 
524             default:
525                 // unknown payload , no byte consumed
526                 return new Result<Object>(null, 0);
527         }
528     }
529 
530     private static Result<MqttConnectPayload> decodeConnectionPayload(
531             ByteBuf buffer,
532             int maxClientIdLength,
533             MqttConnectVariableHeader mqttConnectVariableHeader) {
534         final Result<String> decodedClientId = decodeString(buffer);
535         final String decodedClientIdValue = decodedClientId.value;
536         final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
537                 (byte) mqttConnectVariableHeader.version());
538         if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
539             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
540         }
541         int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
542 
543         Result<String> decodedWillTopic = null;
544         byte[] decodedWillMessage = null;
545 
546         final MqttProperties willProperties;
547         if (mqttConnectVariableHeader.isWillFlag()) {
548             if (mqttVersion == MqttVersion.MQTT_5) {
549                 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
550                 willProperties = propertiesResult.value;
551                 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
552             } else {
553                 willProperties = MqttProperties.NO_PROPERTIES;
554             }
555             decodedWillTopic = decodeString(buffer, 0, 32767);
556             numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
557             decodedWillMessage = decodeByteArray(buffer);
558             numberOfBytesConsumed += decodedWillMessage.length + 2;
559         } else {
560             willProperties = MqttProperties.NO_PROPERTIES;
561         }
562         Result<String> decodedUserName = null;
563         byte[] decodedPassword = null;
564         if (mqttConnectVariableHeader.hasUserName()) {
565             decodedUserName = decodeString(buffer);
566             numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
567         }
568         if (mqttConnectVariableHeader.hasPassword()) {
569             decodedPassword = decodeByteArray(buffer);
570             numberOfBytesConsumed += decodedPassword.length + 2;
571         }
572 
573         final MqttConnectPayload mqttConnectPayload =
574                 new MqttConnectPayload(
575                         decodedClientId.value,
576                         willProperties,
577                         decodedWillTopic != null ? decodedWillTopic.value : null,
578                         decodedWillMessage,
579                         decodedUserName != null ? decodedUserName.value : null,
580                         decodedPassword);
581         return new Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);
582     }
583 
584     private static Result<MqttSubscribePayload> decodeSubscribePayload(
585             ByteBuf buffer,
586             int bytesRemainingInVariablePart) {
587         final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
588         int numberOfBytesConsumed = 0;
589         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
590             final Result<String> decodedTopicName = decodeString(buffer);
591             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
592             //See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details
593             final short optionByte = buffer.readUnsignedByte();
594 
595             MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
596             boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
597             boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
598             RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
599 
600             final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
601                     noLocal,
602                     retainAsPublished,
603                     retainHandling);
604 
605             numberOfBytesConsumed++;
606             subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
607         }
608         return new Result<MqttSubscribePayload>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
609     }
610 
611     private static Result<MqttSubAckPayload> decodeSubackPayload(
612             ByteBuf buffer,
613             int bytesRemainingInVariablePart) {
614         final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
615         int numberOfBytesConsumed = 0;
616         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
617             int reasonCode = buffer.readUnsignedByte();
618             numberOfBytesConsumed++;
619             grantedQos.add(reasonCode);
620         }
621         return new Result<MqttSubAckPayload>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed);
622     }
623 
624     private static Result<MqttUnsubAckPayload> decodeUnsubAckPayload(
625         ChannelHandlerContext ctx,
626         ByteBuf buffer,
627         int bytesRemainingInVariablePart) {
628         final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
629         int numberOfBytesConsumed = 0;
630         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
631             short reasonCode = buffer.readUnsignedByte();
632             numberOfBytesConsumed++;
633             reasonCodes.add(reasonCode);
634         }
635         return new Result<MqttUnsubAckPayload>(new MqttUnsubAckPayload(reasonCodes), numberOfBytesConsumed);
636     }
637 
638     private static Result<MqttUnsubscribePayload> decodeUnsubscribePayload(
639             ByteBuf buffer,
640             int bytesRemainingInVariablePart) {
641         final List<String> unsubscribeTopics = new ArrayList<String>();
642         int numberOfBytesConsumed = 0;
643         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
644             final Result<String> decodedTopicName = decodeString(buffer);
645             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
646             unsubscribeTopics.add(decodedTopicName.value);
647         }
648         return new Result<MqttUnsubscribePayload>(
649                 new MqttUnsubscribePayload(unsubscribeTopics),
650                 numberOfBytesConsumed);
651     }
652 
653     private static Result<ByteBuf> decodePublishPayload(ByteBuf buffer, int bytesRemainingInVariablePart) {
654         ByteBuf b = buffer.readRetainedSlice(bytesRemainingInVariablePart);
655         return new Result<ByteBuf>(b, bytesRemainingInVariablePart);
656     }
657 
658     private static Result<String> decodeString(ByteBuf buffer) {
659         return decodeString(buffer, 0, Integer.MAX_VALUE);
660     }
661 
662     private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
663         int size = decodeMsbLsb(buffer);
664         int numberOfBytesConsumed = 2;
665         if (size < minBytes || size > maxBytes) {
666             buffer.skipBytes(size);
667             numberOfBytesConsumed += size;
668             return new Result<String>(null, numberOfBytesConsumed);
669         }
670         String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
671         buffer.skipBytes(size);
672         numberOfBytesConsumed += size;
673         return new Result<String>(s, numberOfBytesConsumed);
674     }
675 
676     /**
677      *
678      * @return the decoded byte[], numberOfBytesConsumed = byte[].length + 2
679      */
680     private static byte[] decodeByteArray(ByteBuf buffer) {
681         int size = decodeMsbLsb(buffer);
682         byte[] bytes = new byte[size];
683         buffer.readBytes(bytes);
684         return bytes;
685     }
686 
687     // packing utils to reduce the amount of garbage while decoding ints
688     private static long packInts(int a, int b) {
689         return (((long) a) << 32) | (b & 0xFFFFFFFFL);
690     }
691 
692     private static int unpackA(long ints) {
693         return (int) (ints >> 32);
694     }
695 
696     private static int unpackB(long ints) {
697         return (int) ints;
698     }
699 
700     /**
701      *  numberOfBytesConsumed = 2. return decoded result.
702      */
703     private static int decodeMsbLsb(ByteBuf buffer) {
704         int min = 0;
705         int max = 65535;
706         short msbSize = buffer.readUnsignedByte();
707         short lsbSize = buffer.readUnsignedByte();
708         int result = msbSize << 8 | lsbSize;
709         if (result < min || result > max) {
710             result = -1;
711         }
712         return result;
713     }
714 
715     /**
716      * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
717      *
718      * @param buffer the buffer to decode from
719      * @return result pack with a = decoded integer, b = numberOfBytesConsumed. Need to unpack to read them.
720      * @throws DecoderException if bad MQTT protocol limits Remaining Length
721      */
722     private static long decodeVariableByteInteger(ByteBuf buffer) {
723         int remainingLength = 0;
724         int multiplier = 1;
725         short digit;
726         int loops = 0;
727         do {
728             digit = buffer.readUnsignedByte();
729             remainingLength += (digit & 127) * multiplier;
730             multiplier *= 128;
731             loops++;
732         } while ((digit & 128) != 0 && loops < 4);
733 
734         if (loops == 4 && (digit & 128) != 0) {
735             throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
736         }
737         return packInts(remainingLength, loops);
738     }
739 
740     private static final class Result<T> {
741 
742         private final T value;
743         private final int numberOfBytesConsumed;
744 
745         Result(T value, int numberOfBytesConsumed) {
746             this.value = value;
747             this.numberOfBytesConsumed = numberOfBytesConsumed;
748         }
749     }
750 
751     private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
752         final long propertiesLength = decodeVariableByteInteger(buffer);
753         int totalPropertiesLength = unpackA(propertiesLength);
754         int numberOfBytesConsumed = unpackB(propertiesLength);
755 
756         MqttProperties decodedProperties = new MqttProperties();
757         while (numberOfBytesConsumed < totalPropertiesLength) {
758             long propertyId = decodeVariableByteInteger(buffer);
759             final int propertyIdValue = unpackA(propertyId);
760             numberOfBytesConsumed += unpackB(propertyId);
761             MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
762             switch (propertyType) {
763                 case PAYLOAD_FORMAT_INDICATOR:
764                 case REQUEST_PROBLEM_INFORMATION:
765                 case REQUEST_RESPONSE_INFORMATION:
766                 case MAXIMUM_QOS:
767                 case RETAIN_AVAILABLE:
768                 case WILDCARD_SUBSCRIPTION_AVAILABLE:
769                 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
770                 case SHARED_SUBSCRIPTION_AVAILABLE:
771                     final int b1 = buffer.readUnsignedByte();
772                     numberOfBytesConsumed++;
773                     decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
774                     break;
775                 case SERVER_KEEP_ALIVE:
776                 case RECEIVE_MAXIMUM:
777                 case TOPIC_ALIAS_MAXIMUM:
778                 case TOPIC_ALIAS:
779                     final int int2BytesResult = decodeMsbLsb(buffer);
780                     numberOfBytesConsumed += 2;
781                     decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
782                     break;
783                 case PUBLICATION_EXPIRY_INTERVAL:
784                 case SESSION_EXPIRY_INTERVAL:
785                 case WILL_DELAY_INTERVAL:
786                 case MAXIMUM_PACKET_SIZE:
787                     final int maxPacketSize = buffer.readInt();
788                     numberOfBytesConsumed += 4;
789                     decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
790                     break;
791                 case SUBSCRIPTION_IDENTIFIER:
792                     long vbIntegerResult = decodeVariableByteInteger(buffer);
793                     numberOfBytesConsumed += unpackB(vbIntegerResult);
794                     decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
795                     break;
796                 case CONTENT_TYPE:
797                 case RESPONSE_TOPIC:
798                 case ASSIGNED_CLIENT_IDENTIFIER:
799                 case AUTHENTICATION_METHOD:
800                 case RESPONSE_INFORMATION:
801                 case SERVER_REFERENCE:
802                 case REASON_STRING:
803                     final Result<String> stringResult = decodeString(buffer);
804                     numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
805                     decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
806                     break;
807                 case USER_PROPERTY:
808                     final Result<String> keyResult = decodeString(buffer);
809                     final Result<String> valueResult = decodeString(buffer);
810                     numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
811                     numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
812                     decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
813                     break;
814                 case CORRELATION_DATA:
815                 case AUTHENTICATION_DATA:
816                     final byte[] binaryDataResult = decodeByteArray(buffer);
817                     numberOfBytesConsumed += binaryDataResult.length + 2;
818                     decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
819                     break;
820                 default:
821                     //shouldn't reach here
822                     throw new DecoderException("Unknown property type: " + propertyType);
823             }
824         }
825 
826         return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
827     }
828 }