View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.ReplayingDecoder;
23  import io.netty.handler.codec.TooLongFrameException;
24  import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25  import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26  import io.netty.util.CharsetUtil;
27  import io.netty.util.Signal;
28  import io.netty.util.internal.ObjectUtil;
29  
30  import java.util.ArrayList;
31  import java.util.List;
32  
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
34  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
35  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
36  import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
37  import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
38  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
39  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
40  import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
41  
42  /**
43   * Decodes Mqtt messages from bytes, following
44   * the MQTT protocol specification
45   * <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">v3.1</a>
46   * or
47   * <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">v5.0</a>, depending on the
48   * version specified in the CONNECT message that first goes through the channel.
49   */
50  public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
51  
52      /**
53       * States of the decoder.
54       * We start at READ_FIXED_HEADER, followed by
55       * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
56       */
57      enum DecoderState {
58          READ_FIXED_HEADER,
59          READ_VARIABLE_HEADER,
60          READ_PAYLOAD,
61          BAD_MESSAGE,
62      }
63  
64      private MqttFixedHeader mqttFixedHeader;
65      private Object variableHeader;
66      private int bytesRemainingInVariablePart;
67  
68      private final int maxBytesInMessage;
69      private final int maxClientIdLength;
70  
71      public MqttDecoder() {
72        this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
73      }
74  
75      public MqttDecoder(int maxBytesInMessage) {
76          this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
77      }
78  
79      public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
80          super(DecoderState.READ_FIXED_HEADER);
81          this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
82          this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
83      }
84  
85      @Override
86      protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
87          switch (state()) {
88              case READ_FIXED_HEADER: try {
89                  mqttFixedHeader = decodeFixedHeader(ctx, buffer);
90                  bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
91                  checkpoint(DecoderState.READ_VARIABLE_HEADER);
92                  // fall through
93              } catch (Exception cause) {
94                  out.add(invalidMessage(cause));
95                  return;
96              }
97  
98              case READ_VARIABLE_HEADER:  try {
99                  int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
100                 boolean bailOut = false;
101                 try {
102                     variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
103                 } catch (Signal signal) {
104                     if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
105                         // We couldn't parse the complete message, and it's already too large.
106                         // Swallow the Signal (we don't need more data) and instead bail out
107                         // and throw the TooLongFrameException below.
108                         bailOut = true;
109                     } else {
110                         // Ask for REPLAY if the current message is within maxBytesInMessage.
111                         throw signal;
112                     }
113                 }
114                 if (bailOut || bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
115                     buffer.skipBytes(actualReadableBytes());
116                     throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
117                             + bytesRemainingBeforeVariableHeader);
118                 }
119                 checkpoint(DecoderState.READ_PAYLOAD);
120                 // fall through
121             } catch (Exception cause) {
122                 out.add(invalidMessage(cause));
123                 return;
124             }
125 
126             case READ_PAYLOAD: try {
127                 final Object decodedPayload =
128                         decodePayload(
129                                 ctx,
130                                 buffer,
131                                 mqttFixedHeader.messageType(),
132                                 maxClientIdLength,
133                                 variableHeader);
134                 checkpoint(DecoderState.READ_FIXED_HEADER);
135                 MqttMessage message = MqttMessageFactory.newMessage(
136                         mqttFixedHeader, variableHeader, decodedPayload);
137                 mqttFixedHeader = null;
138                 variableHeader = null;
139                 out.add(message);
140                 break;
141             } catch (Exception cause) {
142                 out.add(invalidMessage(cause));
143                 return;
144             }
145 
146             case BAD_MESSAGE:
147                 // Keep discarding until disconnection.
148                 buffer.skipBytes(actualReadableBytes());
149                 break;
150 
151             default:
152                 // Shouldn't reach here.
153                 throw new Error();
154         }
155     }
156 
157     private MqttMessage invalidMessage(Throwable cause) {
158       checkpoint(DecoderState.BAD_MESSAGE);
159       return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
160     }
161 
162     /**
163      * Decodes the fixed header. It's one byte for the flags and then variable
164      * bytes for the remaining length.
165      *
166      * @see
167      * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180841
168      *
169      * @param buffer the buffer to decode from
170      * @return the fixed header
171      */
172     private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
173         short b1 = buffer.readUnsignedByte();
174 
175         MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
176         boolean dupFlag = (b1 & 0x08) == 0x08;
177         int qosLevel = (b1 & 0x06) >> 1;
178         boolean retain = (b1 & 0x01) != 0;
179 
180         switch (messageType) {
181             case PUBLISH:
182                 if (qosLevel == 3) {
183                     throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
184                             + qosLevel + ')');
185                 }
186                 break;
187 
188             case PUBREL:
189             case SUBSCRIBE:
190             case UNSUBSCRIBE:
191                 if (dupFlag) {
192                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
193                             + " message, must be 0, found 1");
194                 }
195                 if (qosLevel != 1) {
196                     throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
197                             + " message, must be 1, found " + qosLevel);
198                 }
199                 if (retain) {
200                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
201                             + " message, must be 0, found 1");
202                 }
203                 break;
204 
205             case AUTH:
206             case CONNACK:
207             case CONNECT:
208             case DISCONNECT:
209             case PINGREQ:
210             case PINGRESP:
211             case PUBACK:
212             case PUBCOMP:
213             case PUBREC:
214             case SUBACK:
215             case UNSUBACK:
216                 if (dupFlag) {
217                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
218                             + " message, must be 0, found 1");
219                 }
220                 if (qosLevel != 0) {
221                     throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
222                             + " message, must be 0, found " + qosLevel);
223                 }
224                 if (retain) {
225                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
226                             + " message, must be 0, found 1");
227                 }
228                 break;
229             default:
230                 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
231         }
232 
233         int remainingLength = 0;
234         int multiplier = 1;
235         short digit;
236         int loops = 0;
237         do {
238             digit = buffer.readUnsignedByte();
239             remainingLength += (digit & 127) * multiplier;
240             multiplier *= 128;
241             loops++;
242         } while ((digit & 128) != 0 && loops < 4);
243 
244         // MQTT protocol limits Remaining Length to 4 bytes
245         if (loops == 4 && (digit & 128) != 0) {
246             throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
247         }
248         MqttFixedHeader decodedFixedHeader =
249                 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
250         return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
251     }
252 
253     /**
254      * Decodes the variable header (if any)
255      * @param buffer the buffer to decode from
256      * @param mqttFixedHeader MqttFixedHeader of the same message
257      * @return the variable header
258      */
259     private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
260         switch (mqttFixedHeader.messageType()) {
261             case CONNECT:
262                 return decodeConnectionVariableHeader(ctx, buffer);
263 
264             case CONNACK:
265                 return decodeConnAckVariableHeader(ctx, buffer);
266 
267             case UNSUBSCRIBE:
268             case SUBSCRIBE:
269             case SUBACK:
270             case UNSUBACK:
271                 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
272 
273             case PUBACK:
274             case PUBREC:
275             case PUBCOMP:
276             case PUBREL:
277                 return decodePubReplyMessage(buffer);
278 
279             case PUBLISH:
280                 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
281 
282             case DISCONNECT:
283             case AUTH:
284                 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
285 
286             case PINGREQ:
287             case PINGRESP:
288                 // Empty variable header
289                 return null;
290             default:
291                 //shouldn't reach here
292                 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
293         }
294     }
295 
296     private MqttConnectVariableHeader decodeConnectionVariableHeader(
297             ChannelHandlerContext ctx,
298             ByteBuf buffer) {
299         final Result<String> protoString = decodeString(buffer);
300         int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
301 
302         final byte protocolLevel = buffer.readByte();
303         numberOfBytesConsumed += 1;
304 
305         MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
306         MqttCodecUtil.setMqttVersion(ctx, version);
307 
308         final int b1 = buffer.readUnsignedByte();
309         numberOfBytesConsumed += 1;
310 
311         final int keepAlive = decodeMsbLsb(buffer);
312         numberOfBytesConsumed += 2;
313 
314         final boolean hasUserName = (b1 & 0x80) == 0x80;
315         final boolean hasPassword = (b1 & 0x40) == 0x40;
316         final boolean willRetain = (b1 & 0x20) == 0x20;
317         final int willQos = (b1 & 0x18) >> 3;
318         final boolean willFlag = (b1 & 0x04) == 0x04;
319         final boolean cleanSession = (b1 & 0x02) == 0x02;
320         if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
321             final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
322             if (!zeroReservedFlag) {
323                 // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
324                 // set to zero and disconnect the Client if it is not zero.
325                 // See https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
326                 throw new DecoderException("non-zero reserved flag");
327             }
328         }
329 
330         final MqttProperties properties;
331         if (version == MqttVersion.MQTT_5) {
332             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
333             properties = propertiesResult.value;
334             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
335         } else {
336             properties = MqttProperties.NO_PROPERTIES;
337         }
338 
339         bytesRemainingInVariablePart -= numberOfBytesConsumed;
340         return new MqttConnectVariableHeader(
341                 version.protocolName(),
342                 version.protocolLevel(),
343                 hasUserName,
344                 hasPassword,
345                 willRetain,
346                 willQos,
347                 willFlag,
348                 cleanSession,
349                 keepAlive,
350                 properties);
351     }
352 
353     private MqttConnAckVariableHeader decodeConnAckVariableHeader(
354             ChannelHandlerContext ctx,
355             ByteBuf buffer) {
356         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
357         final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
358         byte returnCode = buffer.readByte();
359 
360         final MqttProperties properties;
361         if (mqttVersion == MqttVersion.MQTT_5) {
362             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
363             properties = propertiesResult.value;
364             bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
365         } else {
366             properties = MqttProperties.NO_PROPERTIES;
367             bytesRemainingInVariablePart -= 2;
368         }
369 
370         return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
371     }
372 
373     private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
374             ChannelHandlerContext ctx,
375             ByteBuf buffer) {
376         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
377         final int packetId = decodeMessageId(buffer);
378 
379         if (mqttVersion == MqttVersion.MQTT_5) {
380             final Result<MqttProperties> properties = decodeProperties(buffer);
381             bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
382             return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
383         } else {
384             bytesRemainingInVariablePart -= 2;
385             return new MqttMessageIdAndPropertiesVariableHeader(packetId,
386                                                                 MqttProperties.NO_PROPERTIES);
387         }
388     }
389 
390     private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
391         final int packetId = decodeMessageId(buffer);
392 
393         final int packetIdNumberOfBytesConsumed = 2;
394         if (bytesRemainingInVariablePart > 3) {
395             final byte reasonCode = buffer.readByte();
396             final Result<MqttProperties> properties = decodeProperties(buffer);
397             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
398             return new MqttPubReplyMessageVariableHeader(packetId,
399                     reasonCode,
400                     properties.value);
401         } else if (bytesRemainingInVariablePart > 2) {
402             final byte reasonCode = buffer.readByte();
403             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
404             return new MqttPubReplyMessageVariableHeader(packetId,
405                     reasonCode,
406                     MqttProperties.NO_PROPERTIES);
407         } else {
408             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
409             return new MqttPubReplyMessageVariableHeader(packetId,
410                     (byte) 0,
411                     MqttProperties.NO_PROPERTIES);
412         }
413     }
414 
415     private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
416             ByteBuf buffer) {
417         final byte reasonCode;
418         final MqttProperties properties;
419         if (bytesRemainingInVariablePart > 1) {
420             reasonCode = buffer.readByte();
421             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
422             properties = propertiesResult.value;
423             bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
424         } else if (bytesRemainingInVariablePart > 0) {
425             reasonCode = buffer.readByte();
426             properties = MqttProperties.NO_PROPERTIES;
427             --bytesRemainingInVariablePart;
428         } else {
429             reasonCode = 0;
430             properties = MqttProperties.NO_PROPERTIES;
431         }
432 
433         return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
434     }
435 
436     private MqttPublishVariableHeader decodePublishVariableHeader(
437             ChannelHandlerContext ctx,
438             ByteBuf buffer,
439             MqttFixedHeader mqttFixedHeader) {
440         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
441         final Result<String> decodedTopic = decodeString(buffer);
442         if (!isValidPublishTopicName(decodedTopic.value)) {
443             throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
444         }
445         int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
446 
447         int messageId = -1;
448         if (mqttFixedHeader.qosLevel().value() > 0) {
449             messageId = decodeMessageId(buffer);
450             numberOfBytesConsumed += 2;
451         }
452 
453         final MqttProperties properties;
454         if (mqttVersion == MqttVersion.MQTT_5) {
455             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
456             properties = propertiesResult.value;
457             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
458         } else {
459             properties = MqttProperties.NO_PROPERTIES;
460         }
461 
462         bytesRemainingInVariablePart -= numberOfBytesConsumed;
463         return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
464     }
465 
466     /**
467      * @return messageId with numberOfBytesConsumed is 2
468      */
469     private static int decodeMessageId(ByteBuf buffer) {
470         final int messageId = decodeMsbLsb(buffer);
471         if (!isValidMessageId(messageId)) {
472             throw new DecoderException("invalid messageId: " + messageId);
473         }
474         return messageId;
475     }
476 
477     /**
478      * Decodes the payload.
479      *
480      * @param buffer the buffer to decode from
481      * @param messageType  type of the message being decoded
482      * @param variableHeader variable header of the same message
483      * @return the payload
484      */
485     private Object decodePayload(
486             ChannelHandlerContext ctx,
487             ByteBuf buffer,
488             MqttMessageType messageType,
489             int maxClientIdLength,
490             Object variableHeader) {
491         switch (messageType) {
492             case CONNECT:
493                 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
494 
495             case SUBSCRIBE:
496                 return decodeSubscribePayload(buffer);
497 
498             case SUBACK:
499                 return decodeSubackPayload(buffer);
500 
501             case UNSUBSCRIBE:
502                 return decodeUnsubscribePayload(buffer);
503 
504             case UNSUBACK:
505                 return decodeUnsubAckPayload(ctx, buffer);
506 
507             case PUBLISH:
508                 return decodePublishPayload(buffer);
509 
510             default:
511                 // No payload for this message type. If the fixed header's Remaining Length
512                 // claimed bytes beyond what the variable header consumed (e.g. a PINGREQ
513                 // with non-zero Remaining Length), the frame is malformed.
514                 // See https://github.com/netty/netty/issues/16851
515                 validateNoBytesRemain(0);
516                 return null;
517         }
518     }
519 
520     private MqttConnectPayload decodeConnectionPayload(
521             ByteBuf buffer,
522             int maxClientIdLength,
523             MqttConnectVariableHeader mqttConnectVariableHeader) {
524         final Result<String> decodedClientId = decodeString(buffer);
525         final String decodedClientIdValue = decodedClientId.value;
526         final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
527                 (byte) mqttConnectVariableHeader.version());
528         if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
529             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
530         }
531         int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
532 
533         Result<String> decodedWillTopic = null;
534         byte[] decodedWillMessage = null;
535 
536         final MqttProperties willProperties;
537         if (mqttConnectVariableHeader.isWillFlag()) {
538             if (mqttVersion == MqttVersion.MQTT_5) {
539                 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
540                 willProperties = propertiesResult.value;
541                 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
542             } else {
543                 willProperties = MqttProperties.NO_PROPERTIES;
544             }
545             decodedWillTopic = decodeString(buffer, 0, 32767);
546             numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
547             decodedWillMessage = decodeByteArray(buffer);
548             numberOfBytesConsumed += decodedWillMessage.length + 2;
549         } else {
550             willProperties = MqttProperties.NO_PROPERTIES;
551         }
552         Result<String> decodedUserName = null;
553         byte[] decodedPassword = null;
554         if (mqttConnectVariableHeader.hasUserName()) {
555             decodedUserName = decodeString(buffer);
556             numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
557         }
558         if (mqttConnectVariableHeader.hasPassword()) {
559             decodedPassword = decodeByteArray(buffer);
560             numberOfBytesConsumed += decodedPassword.length + 2;
561         }
562 
563         validateNoBytesRemain(numberOfBytesConsumed);
564         return new MqttConnectPayload(
565                         decodedClientId.value,
566                         willProperties,
567                         decodedWillTopic != null ? decodedWillTopic.value : null,
568                         decodedWillMessage,
569                         decodedUserName != null ? decodedUserName.value : null,
570                         decodedPassword);
571     }
572 
573     private MqttSubscribePayload decodeSubscribePayload(
574             ByteBuf buffer) {
575         final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
576         int numberOfBytesConsumed = 0;
577         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
578             final Result<String> decodedTopicName = decodeString(buffer);
579             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
580             //See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details
581             final short optionByte = buffer.readUnsignedByte();
582 
583             MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
584             boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
585             boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
586             RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
587 
588             final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
589                     noLocal,
590                     retainAsPublished,
591                     retainHandling);
592 
593             numberOfBytesConsumed++;
594             subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
595         }
596         validateNoBytesRemain(numberOfBytesConsumed);
597         return new MqttSubscribePayload(subscribeTopics);
598     }
599 
600     private MqttSubAckPayload decodeSubackPayload(
601             ByteBuf buffer) {
602         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
603         final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
604         int numberOfBytesConsumed = 0;
605         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
606             int reasonCode = buffer.readUnsignedByte();
607             numberOfBytesConsumed++;
608             grantedQos.add(reasonCode);
609         }
610         validateNoBytesRemain(numberOfBytesConsumed);
611         return new MqttSubAckPayload(grantedQos);
612     }
613 
614     private MqttUnsubAckPayload decodeUnsubAckPayload(
615         ChannelHandlerContext ctx,
616         ByteBuf buffer) {
617         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
618         final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
619         int numberOfBytesConsumed = 0;
620         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
621             short reasonCode = buffer.readUnsignedByte();
622             numberOfBytesConsumed++;
623             reasonCodes.add(reasonCode);
624         }
625         validateNoBytesRemain(numberOfBytesConsumed);
626         return new MqttUnsubAckPayload(reasonCodes);
627     }
628 
629     private MqttUnsubscribePayload decodeUnsubscribePayload(
630             ByteBuf buffer) {
631         final List<String> unsubscribeTopics = new ArrayList<String>();
632         int numberOfBytesConsumed = 0;
633         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
634             final Result<String> decodedTopicName = decodeString(buffer);
635             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
636             unsubscribeTopics.add(decodedTopicName.value);
637         }
638         validateNoBytesRemain(numberOfBytesConsumed);
639         return new MqttUnsubscribePayload(unsubscribeTopics);
640     }
641 
642     private ByteBuf decodePublishPayload(ByteBuf buffer) {
643         return buffer.readRetainedSlice(bytesRemainingInVariablePart);
644     }
645 
646     private void validateNoBytesRemain(int numberOfBytesConsumed) {
647         bytesRemainingInVariablePart -= numberOfBytesConsumed;
648         if (bytesRemainingInVariablePart != 0) {
649             throw new DecoderException(
650                     "non-zero remaining payload bytes: " +
651                     bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
652         }
653     }
654 
655     private static Result<String> decodeString(ByteBuf buffer) {
656         return decodeString(buffer, 0, Integer.MAX_VALUE);
657     }
658 
659     private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
660         int size = decodeMsbLsb(buffer);
661         int numberOfBytesConsumed = 2;
662         if (size < minBytes || size > maxBytes) {
663             buffer.skipBytes(size);
664             numberOfBytesConsumed += size;
665             return new Result<String>(null, numberOfBytesConsumed);
666         }
667         String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
668         buffer.skipBytes(size);
669         numberOfBytesConsumed += size;
670         return new Result<String>(s, numberOfBytesConsumed);
671     }
672 
673     /**
674      *
675      * @return the decoded byte[], numberOfBytesConsumed = byte[].length + 2
676      */
677     private static byte[] decodeByteArray(ByteBuf buffer) {
678         int size = decodeMsbLsb(buffer);
679         byte[] bytes = new byte[size];
680         buffer.readBytes(bytes);
681         return bytes;
682     }
683 
684     // packing utils to reduce the amount of garbage while decoding ints
685     private static long packInts(int a, int b) {
686         return (((long) a) << 32) | (b & 0xFFFFFFFFL);
687     }
688 
689     private static int unpackA(long ints) {
690         return (int) (ints >> 32);
691     }
692 
693     private static int unpackB(long ints) {
694         return (int) ints;
695     }
696 
697     /**
698      *  numberOfBytesConsumed = 2. return decoded result.
699      */
700     private static int decodeMsbLsb(ByteBuf buffer) {
701         int min = 0;
702         int max = 65535;
703         short msbSize = buffer.readUnsignedByte();
704         short lsbSize = buffer.readUnsignedByte();
705         int result = msbSize << 8 | lsbSize;
706         if (result < min || result > max) {
707             result = -1;
708         }
709         return result;
710     }
711 
712     /**
713      * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
714      *
715      * @param buffer the buffer to decode from
716      * @return result pack with a = decoded integer, b = numberOfBytesConsumed. Need to unpack to read them.
717      * @throws DecoderException if bad MQTT protocol limits Remaining Length
718      */
719     private static long decodeVariableByteInteger(ByteBuf buffer) {
720         int remainingLength = 0;
721         int multiplier = 1;
722         short digit;
723         int loops = 0;
724         do {
725             digit = buffer.readUnsignedByte();
726             remainingLength += (digit & 127) * multiplier;
727             multiplier *= 128;
728             loops++;
729         } while ((digit & 128) != 0 && loops < 4);
730 
731         if (loops == 4 && (digit & 128) != 0) {
732             throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
733         }
734         return packInts(remainingLength, loops);
735     }
736 
737     private static final class Result<T> {
738 
739         private final T value;
740         private final int numberOfBytesConsumed;
741 
742         Result(T value, int numberOfBytesConsumed) {
743             this.value = value;
744             this.numberOfBytesConsumed = numberOfBytesConsumed;
745         }
746     }
747 
748     private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
749         final long propertiesLength = decodeVariableByteInteger(buffer);
750         int totalPropertiesLength = unpackA(propertiesLength);
751         int numberOfBytesConsumed = unpackB(propertiesLength);
752         if (buffer.readableBytes() < totalPropertiesLength) {
753             // Force an early REPLAY to avoid repeatedly parsing the properties.
754             buffer.readSlice(totalPropertiesLength);
755         }
756 
757         MqttProperties decodedProperties = new MqttProperties();
758         while (numberOfBytesConsumed < totalPropertiesLength) {
759             long propertyId = decodeVariableByteInteger(buffer);
760             final int propertyIdValue = unpackA(propertyId);
761             numberOfBytesConsumed += unpackB(propertyId);
762             MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
763             switch (propertyType) {
764                 case PAYLOAD_FORMAT_INDICATOR:
765                 case REQUEST_PROBLEM_INFORMATION:
766                 case REQUEST_RESPONSE_INFORMATION:
767                 case MAXIMUM_QOS:
768                 case RETAIN_AVAILABLE:
769                 case WILDCARD_SUBSCRIPTION_AVAILABLE:
770                 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
771                 case SHARED_SUBSCRIPTION_AVAILABLE:
772                     final int b1 = buffer.readUnsignedByte();
773                     numberOfBytesConsumed++;
774                     decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
775                     break;
776                 case SERVER_KEEP_ALIVE:
777                 case RECEIVE_MAXIMUM:
778                 case TOPIC_ALIAS_MAXIMUM:
779                 case TOPIC_ALIAS:
780                     final int int2BytesResult = decodeMsbLsb(buffer);
781                     numberOfBytesConsumed += 2;
782                     decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
783                     break;
784                 case PUBLICATION_EXPIRY_INTERVAL:
785                 case SESSION_EXPIRY_INTERVAL:
786                 case WILL_DELAY_INTERVAL:
787                 case MAXIMUM_PACKET_SIZE:
788                     final int maxPacketSize = buffer.readInt();
789                     numberOfBytesConsumed += 4;
790                     decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
791                     break;
792                 case SUBSCRIPTION_IDENTIFIER:
793                     long vbIntegerResult = decodeVariableByteInteger(buffer);
794                     numberOfBytesConsumed += unpackB(vbIntegerResult);
795                     decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
796                     break;
797                 case CONTENT_TYPE:
798                 case RESPONSE_TOPIC:
799                 case ASSIGNED_CLIENT_IDENTIFIER:
800                 case AUTHENTICATION_METHOD:
801                 case RESPONSE_INFORMATION:
802                 case SERVER_REFERENCE:
803                 case REASON_STRING:
804                     final Result<String> stringResult = decodeString(buffer);
805                     numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
806                     decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
807                     break;
808                 case USER_PROPERTY:
809                     final Result<String> keyResult = decodeString(buffer);
810                     final Result<String> valueResult = decodeString(buffer);
811                     numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
812                     numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
813                     decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
814                     break;
815                 case CORRELATION_DATA:
816                 case AUTHENTICATION_DATA:
817                     final byte[] binaryDataResult = decodeByteArray(buffer);
818                     numberOfBytesConsumed += binaryDataResult.length + 2;
819                     decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
820                     break;
821                 default:
822                     //shouldn't reach here
823                     throw new DecoderException("Unknown property type: " + propertyType);
824             }
825         }
826 
827         return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
828     }
829 }