View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.ReplayingDecoder;
23  import io.netty.handler.codec.TooLongFrameException;
24  import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25  import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26  import io.netty.util.CharsetUtil;
27  import io.netty.util.Signal;
28  import io.netty.util.internal.ObjectUtil;
29  
30  import java.util.ArrayList;
31  import java.util.List;
32  
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
34  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
35  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
36  import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
37  import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
38  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
39  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
40  import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
41  
42  /**
43   * Decodes Mqtt messages from bytes, following
44   * the MQTT protocol specification
45   * <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">v3.1</a>
46   * or
47   * <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">v5.0</a>, depending on the
48   * version specified in the CONNECT message that first goes through the channel.
49   */
50  public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
51  
52      /**
53       * States of the decoder.
54       * We start at READ_FIXED_HEADER, followed by
55       * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
56       */
57      enum DecoderState {
58          READ_FIXED_HEADER,
59          READ_VARIABLE_HEADER,
60          READ_PAYLOAD,
61          BAD_MESSAGE,
62      }
63  
64      private MqttFixedHeader mqttFixedHeader;
65      private Object variableHeader;
66      private int bytesRemainingInVariablePart;
67  
68      private final int maxBytesInMessage;
69      private final int maxClientIdLength;
70  
71      public MqttDecoder() {
72        this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
73      }
74  
75      public MqttDecoder(int maxBytesInMessage) {
76          this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
77      }
78  
79      public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
80          super(DecoderState.READ_FIXED_HEADER);
81          this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
82          this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
83      }
84  
85      @Override
86      protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
87          switch (state()) {
88              case READ_FIXED_HEADER: try {
89                  mqttFixedHeader = decodeFixedHeader(ctx, buffer);
90                  bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
91                  checkpoint(DecoderState.READ_VARIABLE_HEADER);
92                  // fall through
93              } catch (Exception cause) {
94                  out.add(invalidMessage(cause));
95                  return;
96              }
97  
98              case READ_VARIABLE_HEADER:  try {
99                  int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
100                 boolean bailOut = false;
101                 try {
102                     variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
103                 } catch (Signal signal) {
104                     if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
105                         // We couldn't parse the complete message, and it's already too large.
106                         // Swallow the Signal (we don't need more data) and instead bail out
107                         // and throw the TooLongFrameException below.
108                         bailOut = true;
109                     } else {
110                         // Ask for REPLAY if the current message is within maxBytesInMessage.
111                         throw signal;
112                     }
113                 }
114                 if (bailOut || bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
115                     buffer.skipBytes(actualReadableBytes());
116                     throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
117                             + bytesRemainingBeforeVariableHeader);
118                 }
119                 checkpoint(DecoderState.READ_PAYLOAD);
120                 // fall through
121             } catch (Exception cause) {
122                 out.add(invalidMessage(cause));
123                 return;
124             }
125 
126             case READ_PAYLOAD: try {
127                 final Object decodedPayload =
128                         decodePayload(
129                                 ctx,
130                                 buffer,
131                                 mqttFixedHeader.messageType(),
132                                 maxClientIdLength,
133                                 variableHeader);
134                 checkpoint(DecoderState.READ_FIXED_HEADER);
135                 MqttMessage message = MqttMessageFactory.newMessage(
136                         mqttFixedHeader, variableHeader, decodedPayload);
137                 mqttFixedHeader = null;
138                 variableHeader = null;
139                 out.add(message);
140                 break;
141             } catch (Exception cause) {
142                 out.add(invalidMessage(cause));
143                 return;
144             }
145 
146             case BAD_MESSAGE:
147                 // Keep discarding until disconnection.
148                 buffer.skipBytes(actualReadableBytes());
149                 break;
150 
151             default:
152                 // Shouldn't reach here.
153                 throw new Error();
154         }
155     }
156 
157     private MqttMessage invalidMessage(Throwable cause) {
158       checkpoint(DecoderState.BAD_MESSAGE);
159       return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
160     }
161 
162     /**
163      * Decodes the fixed header. It's one byte for the flags and then variable
164      * bytes for the remaining length.
165      *
166      * @see
167      * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180841
168      *
169      * @param buffer the buffer to decode from
170      * @return the fixed header
171      */
172     private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
173         short b1 = buffer.readUnsignedByte();
174 
175         MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
176         boolean dupFlag = (b1 & 0x08) == 0x08;
177         int qosLevel = (b1 & 0x06) >> 1;
178         boolean retain = (b1 & 0x01) != 0;
179 
180         switch (messageType) {
181             case PUBLISH:
182                 if (qosLevel == 3) {
183                     throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
184                             + qosLevel + ')');
185                 }
186                 break;
187 
188             case PUBREL:
189             case SUBSCRIBE:
190             case UNSUBSCRIBE:
191                 if (dupFlag) {
192                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
193                             + " message, must be 0, found 1");
194                 }
195                 if (qosLevel != 1) {
196                     throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
197                             + " message, must be 1, found " + qosLevel);
198                 }
199                 if (retain) {
200                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
201                             + " message, must be 0, found 1");
202                 }
203                 break;
204 
205             case AUTH:
206             case CONNACK:
207             case CONNECT:
208             case DISCONNECT:
209             case PINGREQ:
210             case PINGRESP:
211             case PUBACK:
212             case PUBCOMP:
213             case PUBREC:
214             case SUBACK:
215             case UNSUBACK:
216                 if (dupFlag) {
217                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
218                             + " message, must be 0, found 1");
219                 }
220                 if (qosLevel != 0) {
221                     throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
222                             + " message, must be 0, found " + qosLevel);
223                 }
224                 if (retain) {
225                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
226                             + " message, must be 0, found 1");
227                 }
228                 break;
229             default:
230                 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
231         }
232 
233         int remainingLength = 0;
234         int multiplier = 1;
235         short digit;
236         int loops = 0;
237         do {
238             digit = buffer.readUnsignedByte();
239             remainingLength += (digit & 127) * multiplier;
240             multiplier *= 128;
241             loops++;
242         } while ((digit & 128) != 0 && loops < 4);
243 
244         // MQTT protocol limits Remaining Length to 4 bytes
245         if (loops == 4 && (digit & 128) != 0) {
246             throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
247         }
248         MqttFixedHeader decodedFixedHeader =
249                 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
250         return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
251     }
252 
253     /**
254      * Decodes the variable header (if any)
255      * @param buffer the buffer to decode from
256      * @param mqttFixedHeader MqttFixedHeader of the same message
257      * @return the variable header
258      */
259     private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
260         switch (mqttFixedHeader.messageType()) {
261             case CONNECT:
262                 return decodeConnectionVariableHeader(ctx, buffer);
263 
264             case CONNACK:
265                 return decodeConnAckVariableHeader(ctx, buffer);
266 
267             case UNSUBSCRIBE:
268             case SUBSCRIBE:
269             case SUBACK:
270             case UNSUBACK:
271                 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
272 
273             case PUBACK:
274             case PUBREC:
275             case PUBCOMP:
276             case PUBREL:
277                 return decodePubReplyMessage(buffer);
278 
279             case PUBLISH:
280                 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
281 
282             case DISCONNECT:
283             case AUTH:
284                 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
285 
286             case PINGREQ:
287             case PINGRESP:
288                 // Empty variable header
289                 return null;
290             default:
291                 //shouldn't reach here
292                 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
293         }
294     }
295 
296     private MqttConnectVariableHeader decodeConnectionVariableHeader(
297             ChannelHandlerContext ctx,
298             ByteBuf buffer) {
299         final Result<String> protoString = decodeString(buffer);
300         int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
301 
302         final byte protocolLevel = buffer.readByte();
303         numberOfBytesConsumed += 1;
304 
305         MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
306         MqttCodecUtil.setMqttVersion(ctx, version);
307 
308         final int b1 = buffer.readUnsignedByte();
309         numberOfBytesConsumed += 1;
310 
311         final int keepAlive = decodeMsbLsb(buffer);
312         numberOfBytesConsumed += 2;
313 
314         final boolean hasUserName = (b1 & 0x80) == 0x80;
315         final boolean hasPassword = (b1 & 0x40) == 0x40;
316         final boolean willRetain = (b1 & 0x20) == 0x20;
317         final int willQos = (b1 & 0x18) >> 3;
318         final boolean willFlag = (b1 & 0x04) == 0x04;
319         final boolean cleanSession = (b1 & 0x02) == 0x02;
320         if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
321             final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
322             if (!zeroReservedFlag) {
323                 // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
324                 // set to zero and disconnect the Client if it is not zero.
325                 // See https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
326                 throw new DecoderException("non-zero reserved flag");
327             }
328         }
329 
330         final MqttProperties properties;
331         if (version == MqttVersion.MQTT_5) {
332             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
333             properties = propertiesResult.value;
334             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
335         } else {
336             properties = MqttProperties.NO_PROPERTIES;
337         }
338 
339         bytesRemainingInVariablePart -= numberOfBytesConsumed;
340         return new MqttConnectVariableHeader(
341                 version.protocolName(),
342                 version.protocolLevel(),
343                 hasUserName,
344                 hasPassword,
345                 willRetain,
346                 willQos,
347                 willFlag,
348                 cleanSession,
349                 keepAlive,
350                 properties);
351     }
352 
353     private MqttConnAckVariableHeader decodeConnAckVariableHeader(
354             ChannelHandlerContext ctx,
355             ByteBuf buffer) {
356         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
357         final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
358         byte returnCode = buffer.readByte();
359 
360         final MqttProperties properties;
361         if (mqttVersion == MqttVersion.MQTT_5) {
362             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
363             properties = propertiesResult.value;
364             bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
365         } else {
366             properties = MqttProperties.NO_PROPERTIES;
367             bytesRemainingInVariablePart -= 2;
368         }
369 
370         return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
371     }
372 
373     private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
374             ChannelHandlerContext ctx,
375             ByteBuf buffer) {
376         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
377         final int packetId = decodeMessageId(buffer);
378 
379         if (mqttVersion == MqttVersion.MQTT_5) {
380             final Result<MqttProperties> properties = decodeProperties(buffer);
381             bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
382             return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
383         } else {
384             bytesRemainingInVariablePart -= 2;
385             return new MqttMessageIdAndPropertiesVariableHeader(packetId,
386                                                                 MqttProperties.NO_PROPERTIES);
387         }
388     }
389 
390     private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
391         final int packetId = decodeMessageId(buffer);
392 
393         final int packetIdNumberOfBytesConsumed = 2;
394         if (bytesRemainingInVariablePart > 3) {
395             final byte reasonCode = buffer.readByte();
396             final Result<MqttProperties> properties = decodeProperties(buffer);
397             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
398             return new MqttPubReplyMessageVariableHeader(packetId,
399                     reasonCode,
400                     properties.value);
401         } else if (bytesRemainingInVariablePart > 2) {
402             final byte reasonCode = buffer.readByte();
403             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
404             return new MqttPubReplyMessageVariableHeader(packetId,
405                     reasonCode,
406                     MqttProperties.NO_PROPERTIES);
407         } else {
408             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
409             return new MqttPubReplyMessageVariableHeader(packetId,
410                     (byte) 0,
411                     MqttProperties.NO_PROPERTIES);
412         }
413     }
414 
415     private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
416             ByteBuf buffer) {
417         final byte reasonCode;
418         final MqttProperties properties;
419         if (bytesRemainingInVariablePart > 1) {
420             reasonCode = buffer.readByte();
421             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
422             properties = propertiesResult.value;
423             bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
424         } else if (bytesRemainingInVariablePart > 0) {
425             reasonCode = buffer.readByte();
426             properties = MqttProperties.NO_PROPERTIES;
427             --bytesRemainingInVariablePart;
428         } else {
429             reasonCode = 0;
430             properties = MqttProperties.NO_PROPERTIES;
431         }
432 
433         return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
434     }
435 
436     private MqttPublishVariableHeader decodePublishVariableHeader(
437             ChannelHandlerContext ctx,
438             ByteBuf buffer,
439             MqttFixedHeader mqttFixedHeader) {
440         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
441         final Result<String> decodedTopic = decodeString(buffer);
442         if (!isValidPublishTopicName(decodedTopic.value)) {
443             throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
444         }
445         int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
446 
447         int messageId = -1;
448         if (mqttFixedHeader.qosLevel().value() > 0) {
449             messageId = decodeMessageId(buffer);
450             numberOfBytesConsumed += 2;
451         }
452 
453         final MqttProperties properties;
454         if (mqttVersion == MqttVersion.MQTT_5) {
455             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
456             properties = propertiesResult.value;
457             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
458         } else {
459             properties = MqttProperties.NO_PROPERTIES;
460         }
461 
462         bytesRemainingInVariablePart -= numberOfBytesConsumed;
463         return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
464     }
465 
466     /**
467      * @return messageId with numberOfBytesConsumed is 2
468      */
469     private static int decodeMessageId(ByteBuf buffer) {
470         final int messageId = decodeMsbLsb(buffer);
471         if (!isValidMessageId(messageId)) {
472             throw new DecoderException("invalid messageId: " + messageId);
473         }
474         return messageId;
475     }
476 
477     /**
478      * Decodes the payload.
479      *
480      * @param buffer the buffer to decode from
481      * @param messageType  type of the message being decoded
482      * @param variableHeader variable header of the same message
483      * @return the payload
484      */
485     private Object decodePayload(
486             ChannelHandlerContext ctx,
487             ByteBuf buffer,
488             MqttMessageType messageType,
489             int maxClientIdLength,
490             Object variableHeader) {
491         switch (messageType) {
492             case CONNECT:
493                 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
494 
495             case SUBSCRIBE:
496                 return decodeSubscribePayload(buffer);
497 
498             case SUBACK:
499                 return decodeSubackPayload(buffer);
500 
501             case UNSUBSCRIBE:
502                 return decodeUnsubscribePayload(buffer);
503 
504             case UNSUBACK:
505                 return decodeUnsubAckPayload(ctx, buffer);
506 
507             case PUBLISH:
508                 return decodePublishPayload(buffer);
509 
510             default:
511                 // unknown payload , no byte consumed
512                 return null;
513         }
514     }
515 
516     private MqttConnectPayload decodeConnectionPayload(
517             ByteBuf buffer,
518             int maxClientIdLength,
519             MqttConnectVariableHeader mqttConnectVariableHeader) {
520         final Result<String> decodedClientId = decodeString(buffer);
521         final String decodedClientIdValue = decodedClientId.value;
522         final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
523                 (byte) mqttConnectVariableHeader.version());
524         if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
525             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
526         }
527         int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
528 
529         Result<String> decodedWillTopic = null;
530         byte[] decodedWillMessage = null;
531 
532         final MqttProperties willProperties;
533         if (mqttConnectVariableHeader.isWillFlag()) {
534             if (mqttVersion == MqttVersion.MQTT_5) {
535                 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
536                 willProperties = propertiesResult.value;
537                 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
538             } else {
539                 willProperties = MqttProperties.NO_PROPERTIES;
540             }
541             decodedWillTopic = decodeString(buffer, 0, 32767);
542             numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
543             decodedWillMessage = decodeByteArray(buffer);
544             numberOfBytesConsumed += decodedWillMessage.length + 2;
545         } else {
546             willProperties = MqttProperties.NO_PROPERTIES;
547         }
548         Result<String> decodedUserName = null;
549         byte[] decodedPassword = null;
550         if (mqttConnectVariableHeader.hasUserName()) {
551             decodedUserName = decodeString(buffer);
552             numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
553         }
554         if (mqttConnectVariableHeader.hasPassword()) {
555             decodedPassword = decodeByteArray(buffer);
556             numberOfBytesConsumed += decodedPassword.length + 2;
557         }
558 
559         validateNoBytesRemain(numberOfBytesConsumed);
560         return new MqttConnectPayload(
561                         decodedClientId.value,
562                         willProperties,
563                         decodedWillTopic != null ? decodedWillTopic.value : null,
564                         decodedWillMessage,
565                         decodedUserName != null ? decodedUserName.value : null,
566                         decodedPassword);
567     }
568 
569     private MqttSubscribePayload decodeSubscribePayload(
570             ByteBuf buffer) {
571         final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
572         int numberOfBytesConsumed = 0;
573         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
574             final Result<String> decodedTopicName = decodeString(buffer);
575             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
576             //See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details
577             final short optionByte = buffer.readUnsignedByte();
578 
579             MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
580             boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
581             boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
582             RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
583 
584             final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
585                     noLocal,
586                     retainAsPublished,
587                     retainHandling);
588 
589             numberOfBytesConsumed++;
590             subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
591         }
592         validateNoBytesRemain(numberOfBytesConsumed);
593         return new MqttSubscribePayload(subscribeTopics);
594     }
595 
596     private MqttSubAckPayload decodeSubackPayload(
597             ByteBuf buffer) {
598         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
599         final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
600         int numberOfBytesConsumed = 0;
601         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
602             int reasonCode = buffer.readUnsignedByte();
603             numberOfBytesConsumed++;
604             grantedQos.add(reasonCode);
605         }
606         validateNoBytesRemain(numberOfBytesConsumed);
607         return new MqttSubAckPayload(grantedQos);
608     }
609 
610     private MqttUnsubAckPayload decodeUnsubAckPayload(
611         ChannelHandlerContext ctx,
612         ByteBuf buffer) {
613         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
614         final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
615         int numberOfBytesConsumed = 0;
616         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
617             short reasonCode = buffer.readUnsignedByte();
618             numberOfBytesConsumed++;
619             reasonCodes.add(reasonCode);
620         }
621         validateNoBytesRemain(numberOfBytesConsumed);
622         return new MqttUnsubAckPayload(reasonCodes);
623     }
624 
625     private MqttUnsubscribePayload decodeUnsubscribePayload(
626             ByteBuf buffer) {
627         final List<String> unsubscribeTopics = new ArrayList<String>();
628         int numberOfBytesConsumed = 0;
629         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
630             final Result<String> decodedTopicName = decodeString(buffer);
631             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
632             unsubscribeTopics.add(decodedTopicName.value);
633         }
634         validateNoBytesRemain(numberOfBytesConsumed);
635         return new MqttUnsubscribePayload(unsubscribeTopics);
636     }
637 
638     private ByteBuf decodePublishPayload(ByteBuf buffer) {
639         return buffer.readRetainedSlice(bytesRemainingInVariablePart);
640     }
641 
642     private void validateNoBytesRemain(int numberOfBytesConsumed) {
643         bytesRemainingInVariablePart -= numberOfBytesConsumed;
644         if (bytesRemainingInVariablePart != 0) {
645             throw new DecoderException(
646                     "non-zero remaining payload bytes: " +
647                     bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
648         }
649     }
650 
651     private static Result<String> decodeString(ByteBuf buffer) {
652         return decodeString(buffer, 0, Integer.MAX_VALUE);
653     }
654 
655     private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
656         int size = decodeMsbLsb(buffer);
657         int numberOfBytesConsumed = 2;
658         if (size < minBytes || size > maxBytes) {
659             buffer.skipBytes(size);
660             numberOfBytesConsumed += size;
661             return new Result<String>(null, numberOfBytesConsumed);
662         }
663         String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
664         buffer.skipBytes(size);
665         numberOfBytesConsumed += size;
666         return new Result<String>(s, numberOfBytesConsumed);
667     }
668 
669     /**
670      *
671      * @return the decoded byte[], numberOfBytesConsumed = byte[].length + 2
672      */
673     private static byte[] decodeByteArray(ByteBuf buffer) {
674         int size = decodeMsbLsb(buffer);
675         byte[] bytes = new byte[size];
676         buffer.readBytes(bytes);
677         return bytes;
678     }
679 
680     // packing utils to reduce the amount of garbage while decoding ints
681     private static long packInts(int a, int b) {
682         return (((long) a) << 32) | (b & 0xFFFFFFFFL);
683     }
684 
685     private static int unpackA(long ints) {
686         return (int) (ints >> 32);
687     }
688 
689     private static int unpackB(long ints) {
690         return (int) ints;
691     }
692 
693     /**
694      *  numberOfBytesConsumed = 2. return decoded result.
695      */
696     private static int decodeMsbLsb(ByteBuf buffer) {
697         int min = 0;
698         int max = 65535;
699         short msbSize = buffer.readUnsignedByte();
700         short lsbSize = buffer.readUnsignedByte();
701         int result = msbSize << 8 | lsbSize;
702         if (result < min || result > max) {
703             result = -1;
704         }
705         return result;
706     }
707 
708     /**
709      * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
710      *
711      * @param buffer the buffer to decode from
712      * @return result pack with a = decoded integer, b = numberOfBytesConsumed. Need to unpack to read them.
713      * @throws DecoderException if bad MQTT protocol limits Remaining Length
714      */
715     private static long decodeVariableByteInteger(ByteBuf buffer) {
716         int remainingLength = 0;
717         int multiplier = 1;
718         short digit;
719         int loops = 0;
720         do {
721             digit = buffer.readUnsignedByte();
722             remainingLength += (digit & 127) * multiplier;
723             multiplier *= 128;
724             loops++;
725         } while ((digit & 128) != 0 && loops < 4);
726 
727         if (loops == 4 && (digit & 128) != 0) {
728             throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
729         }
730         return packInts(remainingLength, loops);
731     }
732 
733     private static final class Result<T> {
734 
735         private final T value;
736         private final int numberOfBytesConsumed;
737 
738         Result(T value, int numberOfBytesConsumed) {
739             this.value = value;
740             this.numberOfBytesConsumed = numberOfBytesConsumed;
741         }
742     }
743 
744     private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
745         final long propertiesLength = decodeVariableByteInteger(buffer);
746         int totalPropertiesLength = unpackA(propertiesLength);
747         int numberOfBytesConsumed = unpackB(propertiesLength);
748         if (buffer.readableBytes() < totalPropertiesLength) {
749             // Force an early REPLAY to avoid repeatedly parsing the properties.
750             buffer.readSlice(totalPropertiesLength);
751         }
752 
753         MqttProperties decodedProperties = new MqttProperties();
754         while (numberOfBytesConsumed < totalPropertiesLength) {
755             long propertyId = decodeVariableByteInteger(buffer);
756             final int propertyIdValue = unpackA(propertyId);
757             numberOfBytesConsumed += unpackB(propertyId);
758             MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
759             switch (propertyType) {
760                 case PAYLOAD_FORMAT_INDICATOR:
761                 case REQUEST_PROBLEM_INFORMATION:
762                 case REQUEST_RESPONSE_INFORMATION:
763                 case MAXIMUM_QOS:
764                 case RETAIN_AVAILABLE:
765                 case WILDCARD_SUBSCRIPTION_AVAILABLE:
766                 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
767                 case SHARED_SUBSCRIPTION_AVAILABLE:
768                     final int b1 = buffer.readUnsignedByte();
769                     numberOfBytesConsumed++;
770                     decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
771                     break;
772                 case SERVER_KEEP_ALIVE:
773                 case RECEIVE_MAXIMUM:
774                 case TOPIC_ALIAS_MAXIMUM:
775                 case TOPIC_ALIAS:
776                     final int int2BytesResult = decodeMsbLsb(buffer);
777                     numberOfBytesConsumed += 2;
778                     decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
779                     break;
780                 case PUBLICATION_EXPIRY_INTERVAL:
781                 case SESSION_EXPIRY_INTERVAL:
782                 case WILL_DELAY_INTERVAL:
783                 case MAXIMUM_PACKET_SIZE:
784                     final int maxPacketSize = buffer.readInt();
785                     numberOfBytesConsumed += 4;
786                     decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
787                     break;
788                 case SUBSCRIPTION_IDENTIFIER:
789                     long vbIntegerResult = decodeVariableByteInteger(buffer);
790                     numberOfBytesConsumed += unpackB(vbIntegerResult);
791                     decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
792                     break;
793                 case CONTENT_TYPE:
794                 case RESPONSE_TOPIC:
795                 case ASSIGNED_CLIENT_IDENTIFIER:
796                 case AUTHENTICATION_METHOD:
797                 case RESPONSE_INFORMATION:
798                 case SERVER_REFERENCE:
799                 case REASON_STRING:
800                     final Result<String> stringResult = decodeString(buffer);
801                     numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
802                     decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
803                     break;
804                 case USER_PROPERTY:
805                     final Result<String> keyResult = decodeString(buffer);
806                     final Result<String> valueResult = decodeString(buffer);
807                     numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
808                     numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
809                     decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
810                     break;
811                 case CORRELATION_DATA:
812                 case AUTHENTICATION_DATA:
813                     final byte[] binaryDataResult = decodeByteArray(buffer);
814                     numberOfBytesConsumed += binaryDataResult.length + 2;
815                     decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
816                     break;
817                 default:
818                     //shouldn't reach here
819                     throw new DecoderException("Unknown property type: " + propertyType);
820             }
821         }
822 
823         return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
824     }
825 }