View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.ReplayingDecoder;
23  import io.netty.handler.codec.TooLongFrameException;
24  import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25  import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26  import io.netty.util.CharsetUtil;
27  import io.netty.util.internal.ObjectUtil;
28  
29  import java.util.ArrayList;
30  import java.util.List;
31  
32  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
34  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
35  import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
36  import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
37  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
38  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
39  import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
40  
41  /**
42   * Decodes Mqtt messages from bytes, following
43   * the MQTT protocol specification
44   * <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">v3.1</a>
45   * or
46   * <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">v5.0</a>, depending on the
47   * version specified in the CONNECT message that first goes through the channel.
48   */
49  public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
50  
51      /**
52       * States of the decoder.
53       * We start at READ_FIXED_HEADER, followed by
54       * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
55       */
56      enum DecoderState {
57          READ_FIXED_HEADER,
58          READ_VARIABLE_HEADER,
59          READ_PAYLOAD,
60          BAD_MESSAGE,
61      }
62  
63      private MqttFixedHeader mqttFixedHeader;
64      private Object variableHeader;
65      private int bytesRemainingInVariablePart;
66  
67      private final int maxBytesInMessage;
68      private final int maxClientIdLength;
69  
70      public MqttDecoder() {
71        this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
72      }
73  
74      public MqttDecoder(int maxBytesInMessage) {
75          this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
76      }
77  
78      public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
79          super(DecoderState.READ_FIXED_HEADER);
80          this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
81          this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
82      }
83  
84      @Override
85      protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
86          switch (state()) {
87              case READ_FIXED_HEADER: try {
88                  mqttFixedHeader = decodeFixedHeader(ctx, buffer);
89                  bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
90                  checkpoint(DecoderState.READ_VARIABLE_HEADER);
91                  // fall through
92              } catch (Exception cause) {
93                  out.add(invalidMessage(cause));
94                  return;
95              }
96  
97              case READ_VARIABLE_HEADER:  try {
98                  int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
99                  variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
100                 if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
101                     buffer.skipBytes(actualReadableBytes());
102                     throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
103                             + bytesRemainingBeforeVariableHeader);
104                 }
105                 checkpoint(DecoderState.READ_PAYLOAD);
106                 // fall through
107             } catch (Exception cause) {
108                 out.add(invalidMessage(cause));
109                 return;
110             }
111 
112             case READ_PAYLOAD: try {
113                 final Object decodedPayload =
114                         decodePayload(
115                                 ctx,
116                                 buffer,
117                                 mqttFixedHeader.messageType(),
118                                 maxClientIdLength,
119                                 variableHeader);
120                 checkpoint(DecoderState.READ_FIXED_HEADER);
121                 MqttMessage message = MqttMessageFactory.newMessage(
122                         mqttFixedHeader, variableHeader, decodedPayload);
123                 mqttFixedHeader = null;
124                 variableHeader = null;
125                 out.add(message);
126                 break;
127             } catch (Exception cause) {
128                 out.add(invalidMessage(cause));
129                 return;
130             }
131 
132             case BAD_MESSAGE:
133                 // Keep discarding until disconnection.
134                 buffer.skipBytes(actualReadableBytes());
135                 break;
136 
137             default:
138                 // Shouldn't reach here.
139                 throw new Error();
140         }
141     }
142 
143     private MqttMessage invalidMessage(Throwable cause) {
144       checkpoint(DecoderState.BAD_MESSAGE);
145       return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
146     }
147 
148     /**
149      * Decodes the fixed header. It's one byte for the flags and then variable
150      * bytes for the remaining length.
151      *
152      * @see
153      * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180841
154      *
155      * @param buffer the buffer to decode from
156      * @return the fixed header
157      */
158     private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
159         short b1 = buffer.readUnsignedByte();
160 
161         MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
162         boolean dupFlag = (b1 & 0x08) == 0x08;
163         int qosLevel = (b1 & 0x06) >> 1;
164         boolean retain = (b1 & 0x01) != 0;
165 
166         switch (messageType) {
167             case PUBLISH:
168                 if (qosLevel == 3) {
169                     throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
170                             + qosLevel + ')');
171                 }
172                 break;
173 
174             case PUBREL:
175             case SUBSCRIBE:
176             case UNSUBSCRIBE:
177                 if (dupFlag) {
178                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
179                             + " message, must be 0, found 1");
180                 }
181                 if (qosLevel != 1) {
182                     throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
183                             + " message, must be 1, found " + qosLevel);
184                 }
185                 if (retain) {
186                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
187                             + " message, must be 0, found 1");
188                 }
189                 break;
190 
191             case AUTH:
192             case CONNACK:
193             case CONNECT:
194             case DISCONNECT:
195             case PINGREQ:
196             case PINGRESP:
197             case PUBACK:
198             case PUBCOMP:
199             case PUBREC:
200             case SUBACK:
201             case UNSUBACK:
202                 if (dupFlag) {
203                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
204                             + " message, must be 0, found 1");
205                 }
206                 if (qosLevel != 0) {
207                     throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
208                             + " message, must be 0, found " + qosLevel);
209                 }
210                 if (retain) {
211                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
212                             + " message, must be 0, found 1");
213                 }
214                 break;
215             default:
216                 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
217         }
218 
219         int remainingLength = 0;
220         int multiplier = 1;
221         short digit;
222         int loops = 0;
223         do {
224             digit = buffer.readUnsignedByte();
225             remainingLength += (digit & 127) * multiplier;
226             multiplier *= 128;
227             loops++;
228         } while ((digit & 128) != 0 && loops < 4);
229 
230         // MQTT protocol limits Remaining Length to 4 bytes
231         if (loops == 4 && (digit & 128) != 0) {
232             throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
233         }
234         MqttFixedHeader decodedFixedHeader =
235                 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
236         return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
237     }
238 
239     /**
240      * Decodes the variable header (if any)
241      * @param buffer the buffer to decode from
242      * @param mqttFixedHeader MqttFixedHeader of the same message
243      * @return the variable header
244      */
245     private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
246         switch (mqttFixedHeader.messageType()) {
247             case CONNECT:
248                 return decodeConnectionVariableHeader(ctx, buffer);
249 
250             case CONNACK:
251                 return decodeConnAckVariableHeader(ctx, buffer);
252 
253             case UNSUBSCRIBE:
254             case SUBSCRIBE:
255             case SUBACK:
256             case UNSUBACK:
257                 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
258 
259             case PUBACK:
260             case PUBREC:
261             case PUBCOMP:
262             case PUBREL:
263                 return decodePubReplyMessage(buffer);
264 
265             case PUBLISH:
266                 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
267 
268             case DISCONNECT:
269             case AUTH:
270                 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
271 
272             case PINGREQ:
273             case PINGRESP:
274                 // Empty variable header
275                 return null;
276             default:
277                 //shouldn't reach here
278                 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
279         }
280     }
281 
282     private MqttConnectVariableHeader decodeConnectionVariableHeader(
283             ChannelHandlerContext ctx,
284             ByteBuf buffer) {
285         final Result<String> protoString = decodeString(buffer);
286         int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
287 
288         final byte protocolLevel = buffer.readByte();
289         numberOfBytesConsumed += 1;
290 
291         MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
292         MqttCodecUtil.setMqttVersion(ctx, version);
293 
294         final int b1 = buffer.readUnsignedByte();
295         numberOfBytesConsumed += 1;
296 
297         final int keepAlive = decodeMsbLsb(buffer);
298         numberOfBytesConsumed += 2;
299 
300         final boolean hasUserName = (b1 & 0x80) == 0x80;
301         final boolean hasPassword = (b1 & 0x40) == 0x40;
302         final boolean willRetain = (b1 & 0x20) == 0x20;
303         final int willQos = (b1 & 0x18) >> 3;
304         final boolean willFlag = (b1 & 0x04) == 0x04;
305         final boolean cleanSession = (b1 & 0x02) == 0x02;
306         if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
307             final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
308             if (!zeroReservedFlag) {
309                 // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
310                 // set to zero and disconnect the Client if it is not zero.
311                 // See https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
312                 throw new DecoderException("non-zero reserved flag");
313             }
314         }
315 
316         final MqttProperties properties;
317         if (version == MqttVersion.MQTT_5) {
318             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
319             properties = propertiesResult.value;
320             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
321         } else {
322             properties = MqttProperties.NO_PROPERTIES;
323         }
324 
325         bytesRemainingInVariablePart -= numberOfBytesConsumed;
326         return new MqttConnectVariableHeader(
327                 version.protocolName(),
328                 version.protocolLevel(),
329                 hasUserName,
330                 hasPassword,
331                 willRetain,
332                 willQos,
333                 willFlag,
334                 cleanSession,
335                 keepAlive,
336                 properties);
337     }
338 
339     private MqttConnAckVariableHeader decodeConnAckVariableHeader(
340             ChannelHandlerContext ctx,
341             ByteBuf buffer) {
342         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
343         final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
344         byte returnCode = buffer.readByte();
345 
346         final MqttProperties properties;
347         if (mqttVersion == MqttVersion.MQTT_5) {
348             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
349             properties = propertiesResult.value;
350             bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
351         } else {
352             properties = MqttProperties.NO_PROPERTIES;
353             bytesRemainingInVariablePart -= 2;
354         }
355 
356         return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
357     }
358 
359     private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
360             ChannelHandlerContext ctx,
361             ByteBuf buffer) {
362         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
363         final int packetId = decodeMessageId(buffer);
364 
365         if (mqttVersion == MqttVersion.MQTT_5) {
366             final Result<MqttProperties> properties = decodeProperties(buffer);
367             bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
368             return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
369         } else {
370             bytesRemainingInVariablePart -= 2;
371             return new MqttMessageIdAndPropertiesVariableHeader(packetId,
372                                                                 MqttProperties.NO_PROPERTIES);
373         }
374     }
375 
376     private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
377         final int packetId = decodeMessageId(buffer);
378 
379         final int packetIdNumberOfBytesConsumed = 2;
380         if (bytesRemainingInVariablePart > 3) {
381             final byte reasonCode = buffer.readByte();
382             final Result<MqttProperties> properties = decodeProperties(buffer);
383             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
384             return new MqttPubReplyMessageVariableHeader(packetId,
385                     reasonCode,
386                     properties.value);
387         } else if (bytesRemainingInVariablePart > 2) {
388             final byte reasonCode = buffer.readByte();
389             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
390             return new MqttPubReplyMessageVariableHeader(packetId,
391                     reasonCode,
392                     MqttProperties.NO_PROPERTIES);
393         } else {
394             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
395             return new MqttPubReplyMessageVariableHeader(packetId,
396                     (byte) 0,
397                     MqttProperties.NO_PROPERTIES);
398         }
399     }
400 
401     private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
402             ByteBuf buffer) {
403         final byte reasonCode;
404         final MqttProperties properties;
405         if (bytesRemainingInVariablePart > 1) {
406             reasonCode = buffer.readByte();
407             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
408             properties = propertiesResult.value;
409             bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
410         } else if (bytesRemainingInVariablePart > 0) {
411             reasonCode = buffer.readByte();
412             properties = MqttProperties.NO_PROPERTIES;
413             --bytesRemainingInVariablePart;
414         } else {
415             reasonCode = 0;
416             properties = MqttProperties.NO_PROPERTIES;
417         }
418 
419         return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
420     }
421 
422     private MqttPublishVariableHeader decodePublishVariableHeader(
423             ChannelHandlerContext ctx,
424             ByteBuf buffer,
425             MqttFixedHeader mqttFixedHeader) {
426         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
427         final Result<String> decodedTopic = decodeString(buffer);
428         if (!isValidPublishTopicName(decodedTopic.value)) {
429             throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
430         }
431         int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
432 
433         int messageId = -1;
434         if (mqttFixedHeader.qosLevel().value() > 0) {
435             messageId = decodeMessageId(buffer);
436             numberOfBytesConsumed += 2;
437         }
438 
439         final MqttProperties properties;
440         if (mqttVersion == MqttVersion.MQTT_5) {
441             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
442             properties = propertiesResult.value;
443             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
444         } else {
445             properties = MqttProperties.NO_PROPERTIES;
446         }
447 
448         bytesRemainingInVariablePart -= numberOfBytesConsumed;
449         return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
450     }
451 
452     /**
453      * @return messageId with numberOfBytesConsumed is 2
454      */
455     private static int decodeMessageId(ByteBuf buffer) {
456         final int messageId = decodeMsbLsb(buffer);
457         if (!isValidMessageId(messageId)) {
458             throw new DecoderException("invalid messageId: " + messageId);
459         }
460         return messageId;
461     }
462 
463     /**
464      * Decodes the payload.
465      *
466      * @param buffer the buffer to decode from
467      * @param messageType  type of the message being decoded
468      * @param variableHeader variable header of the same message
469      * @return the payload
470      */
471     private Object decodePayload(
472             ChannelHandlerContext ctx,
473             ByteBuf buffer,
474             MqttMessageType messageType,
475             int maxClientIdLength,
476             Object variableHeader) {
477         switch (messageType) {
478             case CONNECT:
479                 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
480 
481             case SUBSCRIBE:
482                 return decodeSubscribePayload(buffer);
483 
484             case SUBACK:
485                 return decodeSubackPayload(buffer);
486 
487             case UNSUBSCRIBE:
488                 return decodeUnsubscribePayload(buffer);
489 
490             case UNSUBACK:
491                 return decodeUnsubAckPayload(ctx, buffer);
492 
493             case PUBLISH:
494                 return decodePublishPayload(buffer);
495 
496             default:
497                 // unknown payload , no byte consumed
498                 return null;
499         }
500     }
501 
502     private MqttConnectPayload decodeConnectionPayload(
503             ByteBuf buffer,
504             int maxClientIdLength,
505             MqttConnectVariableHeader mqttConnectVariableHeader) {
506         final Result<String> decodedClientId = decodeString(buffer);
507         final String decodedClientIdValue = decodedClientId.value;
508         final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
509                 (byte) mqttConnectVariableHeader.version());
510         if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
511             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
512         }
513         int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
514 
515         Result<String> decodedWillTopic = null;
516         byte[] decodedWillMessage = null;
517 
518         final MqttProperties willProperties;
519         if (mqttConnectVariableHeader.isWillFlag()) {
520             if (mqttVersion == MqttVersion.MQTT_5) {
521                 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
522                 willProperties = propertiesResult.value;
523                 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
524             } else {
525                 willProperties = MqttProperties.NO_PROPERTIES;
526             }
527             decodedWillTopic = decodeString(buffer, 0, 32767);
528             numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
529             decodedWillMessage = decodeByteArray(buffer);
530             numberOfBytesConsumed += decodedWillMessage.length + 2;
531         } else {
532             willProperties = MqttProperties.NO_PROPERTIES;
533         }
534         Result<String> decodedUserName = null;
535         byte[] decodedPassword = null;
536         if (mqttConnectVariableHeader.hasUserName()) {
537             decodedUserName = decodeString(buffer);
538             numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
539         }
540         if (mqttConnectVariableHeader.hasPassword()) {
541             decodedPassword = decodeByteArray(buffer);
542             numberOfBytesConsumed += decodedPassword.length + 2;
543         }
544 
545         validateNoBytesRemain(numberOfBytesConsumed);
546         return new MqttConnectPayload(
547                         decodedClientId.value,
548                         willProperties,
549                         decodedWillTopic != null ? decodedWillTopic.value : null,
550                         decodedWillMessage,
551                         decodedUserName != null ? decodedUserName.value : null,
552                         decodedPassword);
553     }
554 
555     private MqttSubscribePayload decodeSubscribePayload(
556             ByteBuf buffer) {
557         final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
558         int numberOfBytesConsumed = 0;
559         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
560             final Result<String> decodedTopicName = decodeString(buffer);
561             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
562             //See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details
563             final short optionByte = buffer.readUnsignedByte();
564 
565             MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
566             boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
567             boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
568             RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
569 
570             final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
571                     noLocal,
572                     retainAsPublished,
573                     retainHandling);
574 
575             numberOfBytesConsumed++;
576             subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
577         }
578         validateNoBytesRemain(numberOfBytesConsumed);
579         return new MqttSubscribePayload(subscribeTopics);
580     }
581 
582     private MqttSubAckPayload decodeSubackPayload(
583             ByteBuf buffer) {
584         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
585         final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
586         int numberOfBytesConsumed = 0;
587         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
588             int reasonCode = buffer.readUnsignedByte();
589             numberOfBytesConsumed++;
590             grantedQos.add(reasonCode);
591         }
592         validateNoBytesRemain(numberOfBytesConsumed);
593         return new MqttSubAckPayload(grantedQos);
594     }
595 
596     private MqttUnsubAckPayload decodeUnsubAckPayload(
597         ChannelHandlerContext ctx,
598         ByteBuf buffer) {
599         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
600         final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
601         int numberOfBytesConsumed = 0;
602         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
603             short reasonCode = buffer.readUnsignedByte();
604             numberOfBytesConsumed++;
605             reasonCodes.add(reasonCode);
606         }
607         validateNoBytesRemain(numberOfBytesConsumed);
608         return new MqttUnsubAckPayload(reasonCodes);
609     }
610 
611     private MqttUnsubscribePayload decodeUnsubscribePayload(
612             ByteBuf buffer) {
613         final List<String> unsubscribeTopics = new ArrayList<String>();
614         int numberOfBytesConsumed = 0;
615         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
616             final Result<String> decodedTopicName = decodeString(buffer);
617             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
618             unsubscribeTopics.add(decodedTopicName.value);
619         }
620         validateNoBytesRemain(numberOfBytesConsumed);
621         return new MqttUnsubscribePayload(unsubscribeTopics);
622     }
623 
624     private ByteBuf decodePublishPayload(ByteBuf buffer) {
625         return buffer.readRetainedSlice(bytesRemainingInVariablePart);
626     }
627 
628     private void validateNoBytesRemain(int numberOfBytesConsumed) {
629         bytesRemainingInVariablePart -= numberOfBytesConsumed;
630         if (bytesRemainingInVariablePart != 0) {
631             throw new DecoderException(
632                     "non-zero remaining payload bytes: " +
633                     bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
634         }
635     }
636 
637     private static Result<String> decodeString(ByteBuf buffer) {
638         return decodeString(buffer, 0, Integer.MAX_VALUE);
639     }
640 
641     private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
642         int size = decodeMsbLsb(buffer);
643         int numberOfBytesConsumed = 2;
644         if (size < minBytes || size > maxBytes) {
645             buffer.skipBytes(size);
646             numberOfBytesConsumed += size;
647             return new Result<String>(null, numberOfBytesConsumed);
648         }
649         String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
650         buffer.skipBytes(size);
651         numberOfBytesConsumed += size;
652         return new Result<String>(s, numberOfBytesConsumed);
653     }
654 
655     /**
656      *
657      * @return the decoded byte[], numberOfBytesConsumed = byte[].length + 2
658      */
659     private static byte[] decodeByteArray(ByteBuf buffer) {
660         int size = decodeMsbLsb(buffer);
661         byte[] bytes = new byte[size];
662         buffer.readBytes(bytes);
663         return bytes;
664     }
665 
666     // packing utils to reduce the amount of garbage while decoding ints
667     private static long packInts(int a, int b) {
668         return (((long) a) << 32) | (b & 0xFFFFFFFFL);
669     }
670 
671     private static int unpackA(long ints) {
672         return (int) (ints >> 32);
673     }
674 
675     private static int unpackB(long ints) {
676         return (int) ints;
677     }
678 
679     /**
680      *  numberOfBytesConsumed = 2. return decoded result.
681      */
682     private static int decodeMsbLsb(ByteBuf buffer) {
683         int min = 0;
684         int max = 65535;
685         short msbSize = buffer.readUnsignedByte();
686         short lsbSize = buffer.readUnsignedByte();
687         int result = msbSize << 8 | lsbSize;
688         if (result < min || result > max) {
689             result = -1;
690         }
691         return result;
692     }
693 
694     /**
695      * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
696      *
697      * @param buffer the buffer to decode from
698      * @return result pack with a = decoded integer, b = numberOfBytesConsumed. Need to unpack to read them.
699      * @throws DecoderException if bad MQTT protocol limits Remaining Length
700      */
701     private static long decodeVariableByteInteger(ByteBuf buffer) {
702         int remainingLength = 0;
703         int multiplier = 1;
704         short digit;
705         int loops = 0;
706         do {
707             digit = buffer.readUnsignedByte();
708             remainingLength += (digit & 127) * multiplier;
709             multiplier *= 128;
710             loops++;
711         } while ((digit & 128) != 0 && loops < 4);
712 
713         if (loops == 4 && (digit & 128) != 0) {
714             throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
715         }
716         return packInts(remainingLength, loops);
717     }
718 
719     private static final class Result<T> {
720 
721         private final T value;
722         private final int numberOfBytesConsumed;
723 
724         Result(T value, int numberOfBytesConsumed) {
725             this.value = value;
726             this.numberOfBytesConsumed = numberOfBytesConsumed;
727         }
728     }
729 
730     private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
731         final long propertiesLength = decodeVariableByteInteger(buffer);
732         int totalPropertiesLength = unpackA(propertiesLength);
733         int numberOfBytesConsumed = unpackB(propertiesLength);
734 
735         MqttProperties decodedProperties = new MqttProperties();
736         while (numberOfBytesConsumed < totalPropertiesLength) {
737             long propertyId = decodeVariableByteInteger(buffer);
738             final int propertyIdValue = unpackA(propertyId);
739             numberOfBytesConsumed += unpackB(propertyId);
740             MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
741             switch (propertyType) {
742                 case PAYLOAD_FORMAT_INDICATOR:
743                 case REQUEST_PROBLEM_INFORMATION:
744                 case REQUEST_RESPONSE_INFORMATION:
745                 case MAXIMUM_QOS:
746                 case RETAIN_AVAILABLE:
747                 case WILDCARD_SUBSCRIPTION_AVAILABLE:
748                 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
749                 case SHARED_SUBSCRIPTION_AVAILABLE:
750                     final int b1 = buffer.readUnsignedByte();
751                     numberOfBytesConsumed++;
752                     decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
753                     break;
754                 case SERVER_KEEP_ALIVE:
755                 case RECEIVE_MAXIMUM:
756                 case TOPIC_ALIAS_MAXIMUM:
757                 case TOPIC_ALIAS:
758                     final int int2BytesResult = decodeMsbLsb(buffer);
759                     numberOfBytesConsumed += 2;
760                     decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
761                     break;
762                 case PUBLICATION_EXPIRY_INTERVAL:
763                 case SESSION_EXPIRY_INTERVAL:
764                 case WILL_DELAY_INTERVAL:
765                 case MAXIMUM_PACKET_SIZE:
766                     final int maxPacketSize = buffer.readInt();
767                     numberOfBytesConsumed += 4;
768                     decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
769                     break;
770                 case SUBSCRIPTION_IDENTIFIER:
771                     long vbIntegerResult = decodeVariableByteInteger(buffer);
772                     numberOfBytesConsumed += unpackB(vbIntegerResult);
773                     decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
774                     break;
775                 case CONTENT_TYPE:
776                 case RESPONSE_TOPIC:
777                 case ASSIGNED_CLIENT_IDENTIFIER:
778                 case AUTHENTICATION_METHOD:
779                 case RESPONSE_INFORMATION:
780                 case SERVER_REFERENCE:
781                 case REASON_STRING:
782                     final Result<String> stringResult = decodeString(buffer);
783                     numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
784                     decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
785                     break;
786                 case USER_PROPERTY:
787                     final Result<String> keyResult = decodeString(buffer);
788                     final Result<String> valueResult = decodeString(buffer);
789                     numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
790                     numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
791                     decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
792                     break;
793                 case CORRELATION_DATA:
794                 case AUTHENTICATION_DATA:
795                     final byte[] binaryDataResult = decodeByteArray(buffer);
796                     numberOfBytesConsumed += binaryDataResult.length + 2;
797                     decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
798                     break;
799                 default:
800                     //shouldn't reach here
801                     throw new DecoderException("Unknown property type: " + propertyType);
802             }
803         }
804 
805         return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
806     }
807 }