View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.ReplayingDecoder;
23  import io.netty.handler.codec.TooLongFrameException;
24  import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25  import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26  import io.netty.util.CharsetUtil;
27  import io.netty.util.Signal;
28  import io.netty.util.internal.ObjectUtil;
29  
30  import java.util.ArrayList;
31  import java.util.List;
32  
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
34  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
35  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
36  import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
37  import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
38  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
39  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
40  import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
41  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
42  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
43  import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
44  import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
45  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
46  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
47  import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
48  import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
49  import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
50  import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
51  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
52  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
53  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
54  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
55  import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
56  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
57  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
58  import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
59  import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
60  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
61  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
62  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
63  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
64  import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
65  import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
66  import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
67  import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
68  
69  /**
70   * Decodes Mqtt messages from bytes, following
71   * the MQTT protocol specification
72   * <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">v3.1</a>
73   * or
74   * <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">v5.0</a>, depending on the
75   * version specified in the CONNECT message that first goes through the channel.
76   */
77  public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
78  
79      /**
80       * States of the decoder.
81       * We start at READ_FIXED_HEADER, followed by
82       * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
83       */
84      enum DecoderState {
85          READ_FIXED_HEADER,
86          READ_VARIABLE_HEADER,
87          READ_PAYLOAD,
88          BAD_MESSAGE,
89      }
90  
91      private MqttFixedHeader mqttFixedHeader;
92      private Object variableHeader;
93      private int bytesRemainingInVariablePart;
94  
95      private final int maxBytesInMessage;
96      private final int maxClientIdLength;
97  
98      public MqttDecoder() {
99        this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
100     }
101 
102     public MqttDecoder(int maxBytesInMessage) {
103         this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
104     }
105 
106     public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
107         super(DecoderState.READ_FIXED_HEADER);
108         this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
109         this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
110     }
111 
112     @Override
113     protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
114         switch (state()) {
115             case READ_FIXED_HEADER: try {
116                 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
117                 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
118                 checkpoint(DecoderState.READ_VARIABLE_HEADER);
119                 // fall through
120             } catch (Exception cause) {
121                 out.add(invalidMessage(cause));
122                 return;
123             }
124 
125             case READ_VARIABLE_HEADER:  try {
126                 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
127                 int initialAvailableBytes = buffer.readableBytes();
128                 boolean bailOut = false;
129                 try {
130                     variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
131                 } catch (Signal signal) {
132                     if (initialAvailableBytes < maxBytesInMessage) {
133                         // Ask for REPLAY if the buffer was less than maxBytesInMessage
134                         throw signal;
135                     } else {
136                         // We couldn't parse the complete message, and it's already too large.
137                         // Swallow the Signal (we don't need more data) and instead bail out
138                         // and throw the TooLongFrameException below.
139                         bailOut = true;
140                     }
141                 }
142                 if (bailOut || bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
143                     buffer.skipBytes(actualReadableBytes());
144                     throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
145                             + bytesRemainingBeforeVariableHeader);
146                 }
147                 checkpoint(DecoderState.READ_PAYLOAD);
148                 // fall through
149             } catch (Exception cause) {
150                 out.add(invalidMessage(cause));
151                 return;
152             }
153 
154             case READ_PAYLOAD: try {
155                 final Object decodedPayload =
156                         decodePayload(
157                                 ctx,
158                                 buffer,
159                                 mqttFixedHeader.messageType(),
160                                 maxClientIdLength,
161                                 variableHeader);
162                 checkpoint(DecoderState.READ_FIXED_HEADER);
163                 MqttMessage message = MqttMessageFactory.newMessage(
164                         mqttFixedHeader, variableHeader, decodedPayload);
165                 mqttFixedHeader = null;
166                 variableHeader = null;
167                 out.add(message);
168                 break;
169             } catch (Exception cause) {
170                 out.add(invalidMessage(cause));
171                 return;
172             }
173 
174             case BAD_MESSAGE:
175                 // Keep discarding until disconnection.
176                 buffer.skipBytes(actualReadableBytes());
177                 break;
178 
179             default:
180                 // Shouldn't reach here.
181                 throw new Error("Unexpected mqtt decoder state: " + state());
182         }
183     }
184 
185     private MqttMessage invalidMessage(Throwable cause) {
186       checkpoint(DecoderState.BAD_MESSAGE);
187       return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
188     }
189 
190     /**
191      * Decodes the fixed header. It's one byte for the flags and then variable
192      * bytes for the remaining length.
193      *
194      * @see
195      * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180841
196      *
197      * @param buffer the buffer to decode from
198      * @return the fixed header
199      */
200     private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
201         short b1 = buffer.readUnsignedByte();
202 
203         MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
204         boolean dupFlag = (b1 & 0x08) == 0x08;
205         int qosLevel = (b1 & 0x06) >> 1;
206         boolean retain = (b1 & 0x01) != 0;
207 
208         switch (messageType) {
209             case PUBLISH:
210                 if (qosLevel == 3) {
211                     throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
212                             + qosLevel + ')');
213                 }
214                 break;
215 
216             case PUBREL:
217             case SUBSCRIBE:
218             case UNSUBSCRIBE:
219                 if (dupFlag) {
220                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
221                             + " message, must be 0, found 1");
222                 }
223                 if (qosLevel != 1) {
224                     throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
225                             + " message, must be 1, found " + qosLevel);
226                 }
227                 if (retain) {
228                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
229                             + " message, must be 0, found 1");
230                 }
231                 break;
232 
233             case AUTH:
234             case CONNACK:
235             case CONNECT:
236             case DISCONNECT:
237             case PINGREQ:
238             case PINGRESP:
239             case PUBACK:
240             case PUBCOMP:
241             case PUBREC:
242             case SUBACK:
243             case UNSUBACK:
244                 if (dupFlag) {
245                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
246                             + " message, must be 0, found 1");
247                 }
248                 if (qosLevel != 0) {
249                     throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
250                             + " message, must be 0, found " + qosLevel);
251                 }
252                 if (retain) {
253                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
254                             + " message, must be 0, found 1");
255                 }
256                 break;
257             default:
258                 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
259         }
260 
261         int remainingLength = parseRemainingLength(buffer, messageType);
262         MqttFixedHeader decodedFixedHeader =
263                 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
264         return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
265     }
266 
267     private static int parseRemainingLength(ByteBuf buffer, MqttMessageType messageType) {
268         int remainingLength = 0;
269         int multiplier = 1;
270 
271         for (int i = 0; i < 4; i++) {
272             short digit = buffer.readUnsignedByte();
273             remainingLength += (digit & 127) * multiplier;
274 
275             if ((digit & 128) == 0) {
276                 return remainingLength;
277             }
278 
279             multiplier *= 128;
280         }
281 
282         // MQTT protocol limits Remaining Length to 4 bytes
283         throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
284     }
285 
286     /**
287      * Decodes the variable header (if any)
288      * @param buffer the buffer to decode from
289      * @param mqttFixedHeader MqttFixedHeader of the same message
290      * @return the variable header
291      */
292     private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
293         switch (mqttFixedHeader.messageType()) {
294             case CONNECT:
295                 return decodeConnectionVariableHeader(ctx, buffer);
296 
297             case CONNACK:
298                 return decodeConnAckVariableHeader(ctx, buffer);
299 
300             case UNSUBSCRIBE:
301             case SUBSCRIBE:
302             case SUBACK:
303             case UNSUBACK:
304                 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
305 
306             case PUBACK:
307             case PUBREC:
308             case PUBCOMP:
309             case PUBREL:
310                 return decodePubReplyMessage(buffer);
311 
312             case PUBLISH:
313                 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
314 
315             case DISCONNECT:
316             case AUTH:
317                 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
318 
319             case PINGREQ:
320             case PINGRESP:
321                 // Empty variable header
322                 return null;
323             default:
324                 //shouldn't reach here
325                 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
326         }
327     }
328 
329     private MqttConnectVariableHeader decodeConnectionVariableHeader(
330             ChannelHandlerContext ctx,
331             ByteBuf buffer) {
332         final Result<String> protoString = decodeString(buffer);
333         int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
334 
335         final byte protocolLevel = buffer.readByte();
336         numberOfBytesConsumed += 1;
337 
338         MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
339         MqttCodecUtil.setMqttVersion(ctx, version);
340 
341         final int b1 = buffer.readUnsignedByte();
342         numberOfBytesConsumed += 1;
343 
344         final int keepAlive = decodeMsbLsb(buffer);
345         numberOfBytesConsumed += 2;
346 
347         final boolean hasUserName = (b1 & 0x80) == 0x80;
348         final boolean hasPassword = (b1 & 0x40) == 0x40;
349         final boolean willRetain = (b1 & 0x20) == 0x20;
350         final int willQos = (b1 & 0x18) >> 3;
351         final boolean willFlag = (b1 & 0x04) == 0x04;
352         final boolean cleanSession = (b1 & 0x02) == 0x02;
353         if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
354             final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
355             if (!zeroReservedFlag) {
356                 // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
357                 // set to zero and disconnect the Client if it is not zero.
358                 // See https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
359                 throw new DecoderException("non-zero reserved flag");
360             }
361         }
362 
363         final MqttProperties properties;
364         if (version == MqttVersion.MQTT_5) {
365             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
366             properties = propertiesResult.value;
367             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
368         } else {
369             properties = MqttProperties.NO_PROPERTIES;
370         }
371 
372         bytesRemainingInVariablePart -= numberOfBytesConsumed;
373         return new MqttConnectVariableHeader(
374                 version.protocolName(),
375                 version.protocolLevel(),
376                 hasUserName,
377                 hasPassword,
378                 willRetain,
379                 willQos,
380                 willFlag,
381                 cleanSession,
382                 keepAlive,
383                 properties);
384     }
385 
386     private MqttConnAckVariableHeader decodeConnAckVariableHeader(
387             ChannelHandlerContext ctx,
388             ByteBuf buffer) {
389         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
390         final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
391         byte returnCode = buffer.readByte();
392 
393         final MqttProperties properties;
394         if (mqttVersion == MqttVersion.MQTT_5) {
395             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
396             properties = propertiesResult.value;
397             bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
398         } else {
399             properties = MqttProperties.NO_PROPERTIES;
400             bytesRemainingInVariablePart -= 2;
401         }
402 
403         return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
404     }
405 
406     private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
407             ChannelHandlerContext ctx,
408             ByteBuf buffer) {
409         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
410         final int packetId = decodeMessageId(buffer);
411 
412         if (mqttVersion == MqttVersion.MQTT_5) {
413             final Result<MqttProperties> properties = decodeProperties(buffer);
414             bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
415             return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
416         } else {
417             bytesRemainingInVariablePart -= 2;
418             return new MqttMessageIdAndPropertiesVariableHeader(packetId,
419                                                                 MqttProperties.NO_PROPERTIES);
420         }
421     }
422 
423     private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
424         final int packetId = decodeMessageId(buffer);
425 
426         final int packetIdNumberOfBytesConsumed = 2;
427         if (bytesRemainingInVariablePart > 3) {
428             final byte reasonCode = buffer.readByte();
429             final Result<MqttProperties> properties = decodeProperties(buffer);
430             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
431             return new MqttPubReplyMessageVariableHeader(packetId,
432                     reasonCode,
433                     properties.value);
434         } else if (bytesRemainingInVariablePart > 2) {
435             final byte reasonCode = buffer.readByte();
436             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
437             return new MqttPubReplyMessageVariableHeader(packetId,
438                     reasonCode,
439                     MqttProperties.NO_PROPERTIES);
440         } else {
441             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
442             return new MqttPubReplyMessageVariableHeader(packetId,
443                     (byte) 0,
444                     MqttProperties.NO_PROPERTIES);
445         }
446     }
447 
448     private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
449             ByteBuf buffer) {
450         final byte reasonCode;
451         final MqttProperties properties;
452         if (bytesRemainingInVariablePart > 1) {
453             reasonCode = buffer.readByte();
454             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
455             properties = propertiesResult.value;
456             bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
457         } else if (bytesRemainingInVariablePart > 0) {
458             reasonCode = buffer.readByte();
459             properties = MqttProperties.NO_PROPERTIES;
460             --bytesRemainingInVariablePart;
461         } else {
462             reasonCode = 0;
463             properties = MqttProperties.NO_PROPERTIES;
464         }
465 
466         return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
467     }
468 
469     private MqttPublishVariableHeader decodePublishVariableHeader(
470             ChannelHandlerContext ctx,
471             ByteBuf buffer,
472             MqttFixedHeader mqttFixedHeader) {
473         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
474         final Result<String> decodedTopic = decodeString(buffer);
475         if (!isValidPublishTopicName(decodedTopic.value)) {
476             throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
477         }
478         int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
479 
480         int messageId = -1;
481         if (mqttFixedHeader.qosLevel().value() > 0) {
482             messageId = decodeMessageId(buffer);
483             numberOfBytesConsumed += 2;
484         }
485 
486         final MqttProperties properties;
487         if (mqttVersion == MqttVersion.MQTT_5) {
488             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
489             properties = propertiesResult.value;
490             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
491         } else {
492             properties = MqttProperties.NO_PROPERTIES;
493         }
494 
495         bytesRemainingInVariablePart -= numberOfBytesConsumed;
496         return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
497     }
498 
499     /**
500      * @return messageId with numberOfBytesConsumed is 2
501      */
502     private static int decodeMessageId(ByteBuf buffer) {
503         final int messageId = decodeMsbLsb(buffer);
504         if (!isValidMessageId(messageId)) {
505             throw new DecoderException("invalid messageId: " + messageId);
506         }
507         return messageId;
508     }
509 
510     /**
511      * Decodes the payload.
512      *
513      * @param buffer the buffer to decode from
514      * @param messageType  type of the message being decoded
515      * @param variableHeader variable header of the same message
516      * @return the payload
517      */
518     private Object decodePayload(
519             ChannelHandlerContext ctx,
520             ByteBuf buffer,
521             MqttMessageType messageType,
522             int maxClientIdLength,
523             Object variableHeader) {
524         switch (messageType) {
525             case CONNECT:
526                 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
527 
528             case SUBSCRIBE:
529                 return decodeSubscribePayload(buffer);
530 
531             case SUBACK:
532                 return decodeSubackPayload(buffer);
533 
534             case UNSUBSCRIBE:
535                 return decodeUnsubscribePayload(buffer);
536 
537             case UNSUBACK:
538                 return decodeUnsubAckPayload(ctx, buffer);
539 
540             case PUBLISH:
541                 return decodePublishPayload(buffer);
542 
543             default:
544                 // unknown payload , no byte consumed
545                 return null;
546         }
547     }
548 
549     private MqttConnectPayload decodeConnectionPayload(
550             ByteBuf buffer,
551             int maxClientIdLength,
552             MqttConnectVariableHeader mqttConnectVariableHeader) {
553         final Result<String> decodedClientId = decodeString(buffer);
554         final String decodedClientIdValue = decodedClientId.value;
555         final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
556                 (byte) mqttConnectVariableHeader.version());
557         if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
558             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
559         }
560         int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
561 
562         Result<String> decodedWillTopic = null;
563         byte[] decodedWillMessage = null;
564 
565         final MqttProperties willProperties;
566         if (mqttConnectVariableHeader.isWillFlag()) {
567             if (mqttVersion == MqttVersion.MQTT_5) {
568                 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
569                 willProperties = propertiesResult.value;
570                 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
571             } else {
572                 willProperties = MqttProperties.NO_PROPERTIES;
573             }
574             decodedWillTopic = decodeString(buffer, 0, 32767);
575             numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
576             decodedWillMessage = decodeByteArray(buffer);
577             numberOfBytesConsumed += decodedWillMessage.length + 2;
578         } else {
579             willProperties = MqttProperties.NO_PROPERTIES;
580         }
581         Result<String> decodedUserName = null;
582         byte[] decodedPassword = null;
583         if (mqttConnectVariableHeader.hasUserName()) {
584             decodedUserName = decodeString(buffer);
585             numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
586         }
587         if (mqttConnectVariableHeader.hasPassword()) {
588             decodedPassword = decodeByteArray(buffer);
589             numberOfBytesConsumed += decodedPassword.length + 2;
590         }
591 
592         validateNoBytesRemain(numberOfBytesConsumed);
593         return new MqttConnectPayload(
594                         decodedClientId.value,
595                         willProperties,
596                         decodedWillTopic != null ? decodedWillTopic.value : null,
597                         decodedWillMessage,
598                         decodedUserName != null ? decodedUserName.value : null,
599                         decodedPassword);
600     }
601 
602     private MqttSubscribePayload decodeSubscribePayload(
603             ByteBuf buffer) {
604         final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
605         int numberOfBytesConsumed = 0;
606         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
607             final Result<String> decodedTopicName = decodeString(buffer);
608             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
609             //See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details
610             final short optionByte = buffer.readUnsignedByte();
611 
612             MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
613             boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
614             boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
615             RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
616 
617             final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
618                     noLocal,
619                     retainAsPublished,
620                     retainHandling);
621 
622             numberOfBytesConsumed++;
623             subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
624         }
625         validateNoBytesRemain(numberOfBytesConsumed);
626         return new MqttSubscribePayload(subscribeTopics);
627     }
628 
629     private MqttSubAckPayload decodeSubackPayload(
630             ByteBuf buffer) {
631         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
632         final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
633         int numberOfBytesConsumed = 0;
634         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
635             int reasonCode = buffer.readUnsignedByte();
636             numberOfBytesConsumed++;
637             grantedQos.add(reasonCode);
638         }
639         validateNoBytesRemain(numberOfBytesConsumed);
640         return new MqttSubAckPayload(grantedQos);
641     }
642 
643     private MqttUnsubAckPayload decodeUnsubAckPayload(
644         ChannelHandlerContext ctx,
645         ByteBuf buffer) {
646         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
647         final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
648         int numberOfBytesConsumed = 0;
649         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
650             short reasonCode = buffer.readUnsignedByte();
651             numberOfBytesConsumed++;
652             reasonCodes.add(reasonCode);
653         }
654         validateNoBytesRemain(numberOfBytesConsumed);
655         return new MqttUnsubAckPayload(reasonCodes);
656     }
657 
658     private MqttUnsubscribePayload decodeUnsubscribePayload(
659             ByteBuf buffer) {
660         final List<String> unsubscribeTopics = new ArrayList<String>();
661         int numberOfBytesConsumed = 0;
662         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
663             final Result<String> decodedTopicName = decodeString(buffer);
664             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
665             unsubscribeTopics.add(decodedTopicName.value);
666         }
667         validateNoBytesRemain(numberOfBytesConsumed);
668         return new MqttUnsubscribePayload(unsubscribeTopics);
669     }
670 
671     private ByteBuf decodePublishPayload(ByteBuf buffer) {
672         return buffer.readRetainedSlice(bytesRemainingInVariablePart);
673     }
674 
675     private void validateNoBytesRemain(int numberOfBytesConsumed) {
676         bytesRemainingInVariablePart -= numberOfBytesConsumed;
677         if (bytesRemainingInVariablePart != 0) {
678             throw new DecoderException(
679                     "non-zero remaining payload bytes: " +
680                     bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
681         }
682     }
683 
684     private static Result<String> decodeString(ByteBuf buffer) {
685         return decodeString(buffer, 0, Integer.MAX_VALUE);
686     }
687 
688     private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
689         int size = decodeMsbLsb(buffer);
690         int numberOfBytesConsumed = 2;
691         if (size < minBytes || size > maxBytes) {
692             buffer.skipBytes(size);
693             numberOfBytesConsumed += size;
694             return new Result<String>(null, numberOfBytesConsumed);
695         }
696         String s = buffer.readString(size, CharsetUtil.UTF_8);
697         numberOfBytesConsumed += size;
698         return new Result<String>(s, numberOfBytesConsumed);
699     }
700 
701     /**
702      *
703      * @return the decoded byte[], numberOfBytesConsumed = byte[].length + 2
704      */
705     private static byte[] decodeByteArray(ByteBuf buffer) {
706         int size = decodeMsbLsb(buffer);
707         byte[] bytes = new byte[size];
708         buffer.readBytes(bytes);
709         return bytes;
710     }
711 
712     // packing utils to reduce the amount of garbage while decoding ints
713     private static long packInts(int a, int b) {
714         return (((long) a) << 32) | (b & 0xFFFFFFFFL);
715     }
716 
717     private static int unpackA(long ints) {
718         return (int) (ints >> 32);
719     }
720 
721     private static int unpackB(long ints) {
722         return (int) ints;
723     }
724 
725     /**
726      *  numberOfBytesConsumed = 2. return decoded result.
727      */
728     private static int decodeMsbLsb(ByteBuf buffer) {
729         int min = 0;
730         int max = 65535;
731         short msbSize = buffer.readUnsignedByte();
732         short lsbSize = buffer.readUnsignedByte();
733         int result = msbSize << 8 | lsbSize;
734         if (result < min || result > max) {
735             result = -1;
736         }
737         return result;
738     }
739 
740     /**
741      * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
742      *
743      * @param buffer the buffer to decode from
744      * @return result pack with a = decoded integer, b = numberOfBytesConsumed. Need to unpack to read them.
745      * @throws DecoderException if bad MQTT protocol limits Remaining Length
746      */
747     private static long decodeVariableByteInteger(ByteBuf buffer) {
748         int remainingLength = 0;
749         int multiplier = 1;
750 
751         for (int i = 0; i < 4; i++) {
752             short digit = buffer.readUnsignedByte();
753             remainingLength += (digit & 127) * multiplier;
754 
755             if ((digit & 128) == 0) {
756                 return packInts(remainingLength, i + 1);
757             }
758 
759             multiplier *= 128;
760         }
761 
762         throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
763     }
764 
765     private static final class Result<T> {
766 
767         private final T value;
768         private final int numberOfBytesConsumed;
769 
770         Result(T value, int numberOfBytesConsumed) {
771             this.value = value;
772             this.numberOfBytesConsumed = numberOfBytesConsumed;
773         }
774     }
775 
776     private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
777         final long propertiesLength = decodeVariableByteInteger(buffer);
778         int totalPropertiesLength = unpackA(propertiesLength);
779         int numberOfBytesConsumed = unpackB(propertiesLength);
780         if (buffer.readableBytes() < totalPropertiesLength) {
781             // Force an early REPLAY to avoid repeatedly parsing the properties.
782             buffer.readSlice(totalPropertiesLength);
783         }
784 
785         MqttProperties decodedProperties = new MqttProperties();
786         while (numberOfBytesConsumed < totalPropertiesLength) {
787             long propertyId = decodeVariableByteInteger(buffer);
788             final int propertyIdValue = unpackA(propertyId);
789             numberOfBytesConsumed += unpackB(propertyId);
790             switch (propertyIdValue) {
791                 case PAYLOAD_FORMAT_INDICATOR:
792                 case REQUEST_PROBLEM_INFORMATION:
793                 case REQUEST_RESPONSE_INFORMATION:
794                 case MAXIMUM_QOS:
795                 case RETAIN_AVAILABLE:
796                 case WILDCARD_SUBSCRIPTION_AVAILABLE:
797                 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
798                 case SHARED_SUBSCRIPTION_AVAILABLE:
799                     final int b1 = buffer.readUnsignedByte();
800                     numberOfBytesConsumed++;
801                     decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
802                     break;
803                 case SERVER_KEEP_ALIVE:
804                 case RECEIVE_MAXIMUM:
805                 case TOPIC_ALIAS_MAXIMUM:
806                 case TOPIC_ALIAS:
807                     final int int2BytesResult = decodeMsbLsb(buffer);
808                     numberOfBytesConsumed += 2;
809                     decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
810                     break;
811                 case PUBLICATION_EXPIRY_INTERVAL:
812                 case SESSION_EXPIRY_INTERVAL:
813                 case WILL_DELAY_INTERVAL:
814                 case MAXIMUM_PACKET_SIZE:
815                     final int maxPacketSize = buffer.readInt();
816                     numberOfBytesConsumed += 4;
817                     decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
818                     break;
819                 case SUBSCRIPTION_IDENTIFIER:
820                     long vbIntegerResult = decodeVariableByteInteger(buffer);
821                     numberOfBytesConsumed += unpackB(vbIntegerResult);
822                     decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
823                     break;
824                 case CONTENT_TYPE:
825                 case RESPONSE_TOPIC:
826                 case ASSIGNED_CLIENT_IDENTIFIER:
827                 case AUTHENTICATION_METHOD:
828                 case RESPONSE_INFORMATION:
829                 case SERVER_REFERENCE:
830                 case REASON_STRING:
831                     final Result<String> stringResult = decodeString(buffer);
832                     numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
833                     decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
834                     break;
835                 case USER_PROPERTY:
836                     final Result<String> keyResult = decodeString(buffer);
837                     final Result<String> valueResult = decodeString(buffer);
838                     numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
839                     numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
840                     decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
841                     break;
842                 case CORRELATION_DATA:
843                 case AUTHENTICATION_DATA:
844                     final byte[] binaryDataResult = decodeByteArray(buffer);
845                     numberOfBytesConsumed += binaryDataResult.length + 2;
846                     decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
847                     break;
848                 default:
849                     //shouldn't reach here
850                     throw new DecoderException("Unknown property type: " + propertyIdValue);
851             }
852         }
853 
854         return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
855     }
856 }