View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.ReplayingDecoder;
23  import io.netty.handler.codec.TooLongFrameException;
24  import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25  import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26  import io.netty.util.CharsetUtil;
27  import io.netty.util.Signal;
28  import io.netty.util.internal.ObjectUtil;
29  
30  import java.util.ArrayList;
31  import java.util.List;
32  
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
34  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
35  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
36  import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
37  import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
38  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
39  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
40  import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
41  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
42  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
43  import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
44  import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
45  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
46  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
47  import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
48  import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
49  import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
50  import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
51  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
52  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
53  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
54  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
55  import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
56  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
57  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
58  import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
59  import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
60  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
61  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
62  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
63  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
64  import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
65  import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
66  import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
67  import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
68  
69  /**
70   * Decodes Mqtt messages from bytes, following
71   * the MQTT protocol specification
72   * <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">v3.1</a>
73   * or
74   * <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">v5.0</a>, depending on the
75   * version specified in the CONNECT message that first goes through the channel.
76   */
77  public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
78  
79      /**
80       * States of the decoder.
81       * We start at READ_FIXED_HEADER, followed by
82       * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
83       */
84      enum DecoderState {
85          READ_FIXED_HEADER,
86          READ_VARIABLE_HEADER,
87          READ_PAYLOAD,
88          BAD_MESSAGE,
89      }
90  
91      private MqttFixedHeader mqttFixedHeader;
92      private Object variableHeader;
93      private int bytesRemainingInVariablePart;
94  
95      private final int maxBytesInMessage;
96      private final int maxClientIdLength;
97  
98      public MqttDecoder() {
99        this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
100     }
101 
102     public MqttDecoder(int maxBytesInMessage) {
103         this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
104     }
105 
106     public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
107         super(DecoderState.READ_FIXED_HEADER);
108         this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
109         this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
110     }
111 
112     @Override
113     protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
114         switch (state()) {
115             case READ_FIXED_HEADER: try {
116                 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
117                 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
118                 checkpoint(DecoderState.READ_VARIABLE_HEADER);
119                 // fall through
120             } catch (Exception cause) {
121                 out.add(invalidMessage(cause));
122                 return;
123             }
124 
125             case READ_VARIABLE_HEADER:  try {
126                 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
127                 boolean bailOut = false;
128                 try {
129                     variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
130                 } catch (Signal signal) {
131                     if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
132                         // We couldn't parse the complete message, and it's already too large.
133                         // Swallow the Signal (we don't need more data) and instead bail out
134                         // and throw the TooLongFrameException below.
135                         bailOut = true;
136                     } else {
137                         // Ask for REPLAY if the current message is within maxBytesInMessage.
138                         throw signal;
139                     }
140                 }
141                 if (bailOut || bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
142                     buffer.skipBytes(actualReadableBytes());
143                     throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
144                             + bytesRemainingBeforeVariableHeader);
145                 }
146                 checkpoint(DecoderState.READ_PAYLOAD);
147                 // fall through
148             } catch (Exception cause) {
149                 out.add(invalidMessage(cause));
150                 return;
151             }
152 
153             case READ_PAYLOAD: try {
154                 final Object decodedPayload =
155                         decodePayload(
156                                 ctx,
157                                 buffer,
158                                 mqttFixedHeader.messageType(),
159                                 maxClientIdLength,
160                                 variableHeader);
161                 checkpoint(DecoderState.READ_FIXED_HEADER);
162                 MqttMessage message = MqttMessageFactory.newMessage(
163                         mqttFixedHeader, variableHeader, decodedPayload);
164                 mqttFixedHeader = null;
165                 variableHeader = null;
166                 out.add(message);
167                 break;
168             } catch (Exception cause) {
169                 out.add(invalidMessage(cause));
170                 return;
171             }
172 
173             case BAD_MESSAGE:
174                 // Keep discarding until disconnection.
175                 buffer.skipBytes(actualReadableBytes());
176                 break;
177 
178             default:
179                 // Shouldn't reach here.
180                 throw new Error("Unexpected mqtt decoder state: " + state());
181         }
182     }
183 
184     private MqttMessage invalidMessage(Throwable cause) {
185       checkpoint(DecoderState.BAD_MESSAGE);
186       return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
187     }
188 
189     /**
190      * Decodes the fixed header. It's one byte for the flags and then variable
191      * bytes for the remaining length.
192      *
193      * @see
194      * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180841
195      *
196      * @param buffer the buffer to decode from
197      * @return the fixed header
198      */
199     private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
200         short b1 = buffer.readUnsignedByte();
201 
202         MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
203         boolean dupFlag = (b1 & 0x08) == 0x08;
204         int qosLevel = (b1 & 0x06) >> 1;
205         boolean retain = (b1 & 0x01) != 0;
206 
207         switch (messageType) {
208             case PUBLISH:
209                 if (qosLevel == 3) {
210                     throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
211                             + qosLevel + ')');
212                 }
213                 break;
214 
215             case PUBREL:
216             case SUBSCRIBE:
217             case UNSUBSCRIBE:
218                 if (dupFlag) {
219                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
220                             + " message, must be 0, found 1");
221                 }
222                 if (qosLevel != 1) {
223                     throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
224                             + " message, must be 1, found " + qosLevel);
225                 }
226                 if (retain) {
227                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
228                             + " message, must be 0, found 1");
229                 }
230                 break;
231 
232             case AUTH:
233             case CONNACK:
234             case CONNECT:
235             case DISCONNECT:
236             case PINGREQ:
237             case PINGRESP:
238             case PUBACK:
239             case PUBCOMP:
240             case PUBREC:
241             case SUBACK:
242             case UNSUBACK:
243                 if (dupFlag) {
244                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
245                             + " message, must be 0, found 1");
246                 }
247                 if (qosLevel != 0) {
248                     throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
249                             + " message, must be 0, found " + qosLevel);
250                 }
251                 if (retain) {
252                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
253                             + " message, must be 0, found 1");
254                 }
255                 break;
256             default:
257                 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
258         }
259 
260         int remainingLength = parseRemainingLength(buffer, messageType);
261         MqttFixedHeader decodedFixedHeader =
262                 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
263         return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
264     }
265 
266     private static int parseRemainingLength(ByteBuf buffer, MqttMessageType messageType) {
267         int remainingLength = 0;
268         int multiplier = 1;
269 
270         for (int i = 0; i < 4; i++) {
271             short digit = buffer.readUnsignedByte();
272             remainingLength += (digit & 127) * multiplier;
273 
274             if ((digit & 128) == 0) {
275                 return remainingLength;
276             }
277 
278             multiplier *= 128;
279         }
280 
281         // MQTT protocol limits Remaining Length to 4 bytes
282         throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
283     }
284 
285     /**
286      * Decodes the variable header (if any)
287      * @param buffer the buffer to decode from
288      * @param mqttFixedHeader MqttFixedHeader of the same message
289      * @return the variable header
290      */
291     private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
292         switch (mqttFixedHeader.messageType()) {
293             case CONNECT:
294                 return decodeConnectionVariableHeader(ctx, buffer);
295 
296             case CONNACK:
297                 return decodeConnAckVariableHeader(ctx, buffer);
298 
299             case UNSUBSCRIBE:
300             case SUBSCRIBE:
301             case SUBACK:
302             case UNSUBACK:
303                 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
304 
305             case PUBACK:
306             case PUBREC:
307             case PUBCOMP:
308             case PUBREL:
309                 return decodePubReplyMessage(buffer);
310 
311             case PUBLISH:
312                 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
313 
314             case DISCONNECT:
315             case AUTH:
316                 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
317 
318             case PINGREQ:
319             case PINGRESP:
320                 // Empty variable header
321                 return null;
322             default:
323                 //shouldn't reach here
324                 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
325         }
326     }
327 
328     private MqttConnectVariableHeader decodeConnectionVariableHeader(
329             ChannelHandlerContext ctx,
330             ByteBuf buffer) {
331         final Result<String> protoString = decodeString(buffer);
332         int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
333 
334         final byte protocolLevel = buffer.readByte();
335         numberOfBytesConsumed += 1;
336 
337         MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
338         MqttCodecUtil.setMqttVersion(ctx, version);
339 
340         final int b1 = buffer.readUnsignedByte();
341         numberOfBytesConsumed += 1;
342 
343         final int keepAlive = decodeMsbLsb(buffer);
344         numberOfBytesConsumed += 2;
345 
346         final boolean hasUserName = (b1 & 0x80) == 0x80;
347         final boolean hasPassword = (b1 & 0x40) == 0x40;
348         final boolean willRetain = (b1 & 0x20) == 0x20;
349         final int willQos = (b1 & 0x18) >> 3;
350         final boolean willFlag = (b1 & 0x04) == 0x04;
351         final boolean cleanSession = (b1 & 0x02) == 0x02;
352         if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
353             final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
354             if (!zeroReservedFlag) {
355                 // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
356                 // set to zero and disconnect the Client if it is not zero.
357                 // See https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
358                 throw new DecoderException("non-zero reserved flag");
359             }
360         }
361 
362         final MqttProperties properties;
363         if (version == MqttVersion.MQTT_5) {
364             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
365             properties = propertiesResult.value;
366             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
367         } else {
368             properties = MqttProperties.NO_PROPERTIES;
369         }
370 
371         bytesRemainingInVariablePart -= numberOfBytesConsumed;
372         return new MqttConnectVariableHeader(
373                 version.protocolName(),
374                 version.protocolLevel(),
375                 hasUserName,
376                 hasPassword,
377                 willRetain,
378                 willQos,
379                 willFlag,
380                 cleanSession,
381                 keepAlive,
382                 properties);
383     }
384 
385     private MqttConnAckVariableHeader decodeConnAckVariableHeader(
386             ChannelHandlerContext ctx,
387             ByteBuf buffer) {
388         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
389         final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
390         byte returnCode = buffer.readByte();
391 
392         final MqttProperties properties;
393         if (mqttVersion == MqttVersion.MQTT_5) {
394             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
395             properties = propertiesResult.value;
396             bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
397         } else {
398             properties = MqttProperties.NO_PROPERTIES;
399             bytesRemainingInVariablePart -= 2;
400         }
401 
402         return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
403     }
404 
405     private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
406             ChannelHandlerContext ctx,
407             ByteBuf buffer) {
408         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
409         final int packetId = decodeMessageId(buffer);
410 
411         if (mqttVersion == MqttVersion.MQTT_5) {
412             final Result<MqttProperties> properties = decodeProperties(buffer);
413             bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
414             return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
415         } else {
416             bytesRemainingInVariablePart -= 2;
417             return new MqttMessageIdAndPropertiesVariableHeader(packetId,
418                                                                 MqttProperties.NO_PROPERTIES);
419         }
420     }
421 
422     private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
423         final int packetId = decodeMessageId(buffer);
424 
425         final int packetIdNumberOfBytesConsumed = 2;
426         if (bytesRemainingInVariablePart > 3) {
427             final byte reasonCode = buffer.readByte();
428             final Result<MqttProperties> properties = decodeProperties(buffer);
429             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
430             return new MqttPubReplyMessageVariableHeader(packetId,
431                     reasonCode,
432                     properties.value);
433         } else if (bytesRemainingInVariablePart > 2) {
434             final byte reasonCode = buffer.readByte();
435             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
436             return new MqttPubReplyMessageVariableHeader(packetId,
437                     reasonCode,
438                     MqttProperties.NO_PROPERTIES);
439         } else {
440             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
441             return new MqttPubReplyMessageVariableHeader(packetId,
442                     (byte) 0,
443                     MqttProperties.NO_PROPERTIES);
444         }
445     }
446 
447     private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
448             ByteBuf buffer) {
449         final byte reasonCode;
450         final MqttProperties properties;
451         if (bytesRemainingInVariablePart > 1) {
452             reasonCode = buffer.readByte();
453             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
454             properties = propertiesResult.value;
455             bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
456         } else if (bytesRemainingInVariablePart > 0) {
457             reasonCode = buffer.readByte();
458             properties = MqttProperties.NO_PROPERTIES;
459             --bytesRemainingInVariablePart;
460         } else {
461             reasonCode = 0;
462             properties = MqttProperties.NO_PROPERTIES;
463         }
464 
465         return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
466     }
467 
468     private MqttPublishVariableHeader decodePublishVariableHeader(
469             ChannelHandlerContext ctx,
470             ByteBuf buffer,
471             MqttFixedHeader mqttFixedHeader) {
472         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
473         final Result<String> decodedTopic = decodeString(buffer);
474         if (!isValidPublishTopicName(decodedTopic.value)) {
475             throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
476         }
477         int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
478 
479         int messageId = -1;
480         if (mqttFixedHeader.qosLevel().value() > 0) {
481             messageId = decodeMessageId(buffer);
482             numberOfBytesConsumed += 2;
483         }
484 
485         final MqttProperties properties;
486         if (mqttVersion == MqttVersion.MQTT_5) {
487             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
488             properties = propertiesResult.value;
489             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
490         } else {
491             properties = MqttProperties.NO_PROPERTIES;
492         }
493 
494         bytesRemainingInVariablePart -= numberOfBytesConsumed;
495         return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
496     }
497 
498     /**
499      * @return messageId with numberOfBytesConsumed is 2
500      */
501     private static int decodeMessageId(ByteBuf buffer) {
502         final int messageId = decodeMsbLsb(buffer);
503         if (!isValidMessageId(messageId)) {
504             throw new DecoderException("invalid messageId: " + messageId);
505         }
506         return messageId;
507     }
508 
509     /**
510      * Decodes the payload.
511      *
512      * @param buffer the buffer to decode from
513      * @param messageType  type of the message being decoded
514      * @param variableHeader variable header of the same message
515      * @return the payload
516      */
517     private Object decodePayload(
518             ChannelHandlerContext ctx,
519             ByteBuf buffer,
520             MqttMessageType messageType,
521             int maxClientIdLength,
522             Object variableHeader) {
523         switch (messageType) {
524             case CONNECT:
525                 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
526 
527             case SUBSCRIBE:
528                 return decodeSubscribePayload(buffer);
529 
530             case SUBACK:
531                 return decodeSubackPayload(buffer);
532 
533             case UNSUBSCRIBE:
534                 return decodeUnsubscribePayload(buffer);
535 
536             case UNSUBACK:
537                 return decodeUnsubAckPayload(ctx, buffer);
538 
539             case PUBLISH:
540                 return decodePublishPayload(buffer);
541 
542             default:
543                 // No payload for this message type. If the fixed header's Remaining Length
544                 // claimed bytes beyond what the variable header consumed (e.g. a PINGREQ
545                 // with non-zero Remaining Length), the frame is malformed.
546                 // See https://github.com/netty/netty/issues/16851
547                 validateNoBytesRemain(0);
548                 return null;
549         }
550     }
551 
552     private MqttConnectPayload decodeConnectionPayload(
553             ByteBuf buffer,
554             int maxClientIdLength,
555             MqttConnectVariableHeader mqttConnectVariableHeader) {
556         final Result<String> decodedClientId = decodeString(buffer);
557         final String decodedClientIdValue = decodedClientId.value;
558         final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
559                 (byte) mqttConnectVariableHeader.version());
560         if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
561             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
562         }
563         int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
564 
565         Result<String> decodedWillTopic = null;
566         byte[] decodedWillMessage = null;
567 
568         final MqttProperties willProperties;
569         if (mqttConnectVariableHeader.isWillFlag()) {
570             if (mqttVersion == MqttVersion.MQTT_5) {
571                 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
572                 willProperties = propertiesResult.value;
573                 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
574             } else {
575                 willProperties = MqttProperties.NO_PROPERTIES;
576             }
577             decodedWillTopic = decodeString(buffer, 0, 32767);
578             numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
579             decodedWillMessage = decodeByteArray(buffer);
580             numberOfBytesConsumed += decodedWillMessage.length + 2;
581         } else {
582             willProperties = MqttProperties.NO_PROPERTIES;
583         }
584         Result<String> decodedUserName = null;
585         byte[] decodedPassword = null;
586         if (mqttConnectVariableHeader.hasUserName()) {
587             decodedUserName = decodeString(buffer);
588             numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
589         }
590         if (mqttConnectVariableHeader.hasPassword()) {
591             decodedPassword = decodeByteArray(buffer);
592             numberOfBytesConsumed += decodedPassword.length + 2;
593         }
594 
595         validateNoBytesRemain(numberOfBytesConsumed);
596         return new MqttConnectPayload(
597                         decodedClientId.value,
598                         willProperties,
599                         decodedWillTopic != null ? decodedWillTopic.value : null,
600                         decodedWillMessage,
601                         decodedUserName != null ? decodedUserName.value : null,
602                         decodedPassword);
603     }
604 
605     private MqttSubscribePayload decodeSubscribePayload(
606             ByteBuf buffer) {
607         final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
608         int numberOfBytesConsumed = 0;
609         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
610             final Result<String> decodedTopicName = decodeString(buffer);
611             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
612             //See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details
613             final short optionByte = buffer.readUnsignedByte();
614 
615             MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
616             boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
617             boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
618             RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
619 
620             final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
621                     noLocal,
622                     retainAsPublished,
623                     retainHandling);
624 
625             numberOfBytesConsumed++;
626             subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
627         }
628         validateNoBytesRemain(numberOfBytesConsumed);
629         return new MqttSubscribePayload(subscribeTopics);
630     }
631 
632     private MqttSubAckPayload decodeSubackPayload(
633             ByteBuf buffer) {
634         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
635         final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
636         int numberOfBytesConsumed = 0;
637         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
638             int reasonCode = buffer.readUnsignedByte();
639             numberOfBytesConsumed++;
640             grantedQos.add(reasonCode);
641         }
642         validateNoBytesRemain(numberOfBytesConsumed);
643         return new MqttSubAckPayload(grantedQos);
644     }
645 
646     private MqttUnsubAckPayload decodeUnsubAckPayload(
647         ChannelHandlerContext ctx,
648         ByteBuf buffer) {
649         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
650         final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
651         int numberOfBytesConsumed = 0;
652         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
653             short reasonCode = buffer.readUnsignedByte();
654             numberOfBytesConsumed++;
655             reasonCodes.add(reasonCode);
656         }
657         validateNoBytesRemain(numberOfBytesConsumed);
658         return new MqttUnsubAckPayload(reasonCodes);
659     }
660 
661     private MqttUnsubscribePayload decodeUnsubscribePayload(
662             ByteBuf buffer) {
663         final List<String> unsubscribeTopics = new ArrayList<String>();
664         int numberOfBytesConsumed = 0;
665         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
666             final Result<String> decodedTopicName = decodeString(buffer);
667             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
668             unsubscribeTopics.add(decodedTopicName.value);
669         }
670         validateNoBytesRemain(numberOfBytesConsumed);
671         return new MqttUnsubscribePayload(unsubscribeTopics);
672     }
673 
674     private ByteBuf decodePublishPayload(ByteBuf buffer) {
675         return buffer.readRetainedSlice(bytesRemainingInVariablePart);
676     }
677 
678     private void validateNoBytesRemain(int numberOfBytesConsumed) {
679         bytesRemainingInVariablePart -= numberOfBytesConsumed;
680         if (bytesRemainingInVariablePart != 0) {
681             throw new DecoderException(
682                     "non-zero remaining payload bytes: " +
683                     bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
684         }
685     }
686 
687     private static Result<String> decodeString(ByteBuf buffer) {
688         return decodeString(buffer, 0, Integer.MAX_VALUE);
689     }
690 
691     private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
692         int size = decodeMsbLsb(buffer);
693         int numberOfBytesConsumed = 2;
694         if (size < minBytes || size > maxBytes) {
695             buffer.skipBytes(size);
696             numberOfBytesConsumed += size;
697             return new Result<String>(null, numberOfBytesConsumed);
698         }
699         String s = buffer.readString(size, CharsetUtil.UTF_8);
700         numberOfBytesConsumed += size;
701         return new Result<String>(s, numberOfBytesConsumed);
702     }
703 
704     /**
705      *
706      * @return the decoded byte[], numberOfBytesConsumed = byte[].length + 2
707      */
708     private static byte[] decodeByteArray(ByteBuf buffer) {
709         int size = decodeMsbLsb(buffer);
710         byte[] bytes = new byte[size];
711         buffer.readBytes(bytes);
712         return bytes;
713     }
714 
715     // packing utils to reduce the amount of garbage while decoding ints
716     private static long packInts(int a, int b) {
717         return (((long) a) << 32) | (b & 0xFFFFFFFFL);
718     }
719 
720     private static int unpackA(long ints) {
721         return (int) (ints >> 32);
722     }
723 
724     private static int unpackB(long ints) {
725         return (int) ints;
726     }
727 
728     /**
729      *  numberOfBytesConsumed = 2. return decoded result.
730      */
731     private static int decodeMsbLsb(ByteBuf buffer) {
732         int min = 0;
733         int max = 65535;
734         short msbSize = buffer.readUnsignedByte();
735         short lsbSize = buffer.readUnsignedByte();
736         int result = msbSize << 8 | lsbSize;
737         if (result < min || result > max) {
738             result = -1;
739         }
740         return result;
741     }
742 
743     /**
744      * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
745      *
746      * @param buffer the buffer to decode from
747      * @return result pack with a = decoded integer, b = numberOfBytesConsumed. Need to unpack to read them.
748      * @throws DecoderException if bad MQTT protocol limits Remaining Length
749      */
750     private static long decodeVariableByteInteger(ByteBuf buffer) {
751         int remainingLength = 0;
752         int multiplier = 1;
753 
754         for (int i = 0; i < 4; i++) {
755             short digit = buffer.readUnsignedByte();
756             remainingLength += (digit & 127) * multiplier;
757 
758             if ((digit & 128) == 0) {
759                 return packInts(remainingLength, i + 1);
760             }
761 
762             multiplier *= 128;
763         }
764 
765         throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
766     }
767 
768     private static final class Result<T> {
769 
770         private final T value;
771         private final int numberOfBytesConsumed;
772 
773         Result(T value, int numberOfBytesConsumed) {
774             this.value = value;
775             this.numberOfBytesConsumed = numberOfBytesConsumed;
776         }
777     }
778 
779     private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
780         final long propertiesLength = decodeVariableByteInteger(buffer);
781         int totalPropertiesLength = unpackA(propertiesLength);
782         int numberOfBytesConsumed = unpackB(propertiesLength);
783         if (buffer.readableBytes() < totalPropertiesLength) {
784             // Force an early REPLAY to avoid repeatedly parsing the properties.
785             buffer.readSlice(totalPropertiesLength);
786         }
787 
788         MqttProperties decodedProperties = new MqttProperties();
789         while (numberOfBytesConsumed < totalPropertiesLength) {
790             long propertyId = decodeVariableByteInteger(buffer);
791             final int propertyIdValue = unpackA(propertyId);
792             numberOfBytesConsumed += unpackB(propertyId);
793             switch (propertyIdValue) {
794                 case PAYLOAD_FORMAT_INDICATOR:
795                 case REQUEST_PROBLEM_INFORMATION:
796                 case REQUEST_RESPONSE_INFORMATION:
797                 case MAXIMUM_QOS:
798                 case RETAIN_AVAILABLE:
799                 case WILDCARD_SUBSCRIPTION_AVAILABLE:
800                 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
801                 case SHARED_SUBSCRIPTION_AVAILABLE:
802                     final int b1 = buffer.readUnsignedByte();
803                     numberOfBytesConsumed++;
804                     decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
805                     break;
806                 case SERVER_KEEP_ALIVE:
807                 case RECEIVE_MAXIMUM:
808                 case TOPIC_ALIAS_MAXIMUM:
809                 case TOPIC_ALIAS:
810                     final int int2BytesResult = decodeMsbLsb(buffer);
811                     numberOfBytesConsumed += 2;
812                     decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
813                     break;
814                 case PUBLICATION_EXPIRY_INTERVAL:
815                 case SESSION_EXPIRY_INTERVAL:
816                 case WILL_DELAY_INTERVAL:
817                 case MAXIMUM_PACKET_SIZE:
818                     final int maxPacketSize = buffer.readInt();
819                     numberOfBytesConsumed += 4;
820                     decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
821                     break;
822                 case SUBSCRIPTION_IDENTIFIER:
823                     long vbIntegerResult = decodeVariableByteInteger(buffer);
824                     numberOfBytesConsumed += unpackB(vbIntegerResult);
825                     decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
826                     break;
827                 case CONTENT_TYPE:
828                 case RESPONSE_TOPIC:
829                 case ASSIGNED_CLIENT_IDENTIFIER:
830                 case AUTHENTICATION_METHOD:
831                 case RESPONSE_INFORMATION:
832                 case SERVER_REFERENCE:
833                 case REASON_STRING:
834                     final Result<String> stringResult = decodeString(buffer);
835                     numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
836                     decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
837                     break;
838                 case USER_PROPERTY:
839                     final Result<String> keyResult = decodeString(buffer);
840                     final Result<String> valueResult = decodeString(buffer);
841                     numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
842                     numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
843                     decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
844                     break;
845                 case CORRELATION_DATA:
846                 case AUTHENTICATION_DATA:
847                     final byte[] binaryDataResult = decodeByteArray(buffer);
848                     numberOfBytesConsumed += binaryDataResult.length + 2;
849                     decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
850                     break;
851                 default:
852                     //shouldn't reach here
853                     throw new DecoderException("Unknown property type: " + propertyIdValue);
854             }
855         }
856 
857         return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
858     }
859 }