View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.ReplayingDecoder;
23  import io.netty.handler.codec.TooLongFrameException;
24  import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25  import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26  import io.netty.util.CharsetUtil;
27  import io.netty.util.internal.ObjectUtil;
28  
29  import java.util.ArrayList;
30  import java.util.List;
31  
32  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
34  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
35  import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
36  import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
37  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
38  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
39  import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
40  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
41  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
42  import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
43  import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
44  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
45  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
46  import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
47  import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
48  import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
49  import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
50  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
51  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
52  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
53  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
54  import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
55  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
56  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
57  import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
58  import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
59  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
60  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
61  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
62  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
63  import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
64  import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
65  import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
66  import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
67  
68  /**
69   * Decodes Mqtt messages from bytes, following
70   * the MQTT protocol specification
71   * <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">v3.1</a>
72   * or
73   * <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">v5.0</a>, depending on the
74   * version specified in the CONNECT message that first goes through the channel.
75   */
76  public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
77  
78      /**
79       * States of the decoder.
80       * We start at READ_FIXED_HEADER, followed by
81       * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
82       */
83      enum DecoderState {
84          READ_FIXED_HEADER,
85          READ_VARIABLE_HEADER,
86          READ_PAYLOAD,
87          BAD_MESSAGE,
88      }
89  
90      private MqttFixedHeader mqttFixedHeader;
91      private Object variableHeader;
92      private int bytesRemainingInVariablePart;
93  
94      private final int maxBytesInMessage;
95      private final int maxClientIdLength;
96  
97      public MqttDecoder() {
98        this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
99      }
100 
101     public MqttDecoder(int maxBytesInMessage) {
102         this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
103     }
104 
105     public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
106         super(DecoderState.READ_FIXED_HEADER);
107         this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
108         this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
109     }
110 
111     @Override
112     protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
113         switch (state()) {
114             case READ_FIXED_HEADER: try {
115                 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
116                 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
117                 checkpoint(DecoderState.READ_VARIABLE_HEADER);
118                 // fall through
119             } catch (Exception cause) {
120                 out.add(invalidMessage(cause));
121                 return;
122             }
123 
124             case READ_VARIABLE_HEADER:  try {
125                 int bytesRemainingBeforeVariableHeader = bytesRemainingInVariablePart;
126                 variableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
127                 if (bytesRemainingBeforeVariableHeader > maxBytesInMessage) {
128                     buffer.skipBytes(actualReadableBytes());
129                     throw new TooLongFrameException("message length exceeds " + maxBytesInMessage + ": "
130                             + bytesRemainingBeforeVariableHeader);
131                 }
132                 checkpoint(DecoderState.READ_PAYLOAD);
133                 // fall through
134             } catch (Exception cause) {
135                 out.add(invalidMessage(cause));
136                 return;
137             }
138 
139             case READ_PAYLOAD: try {
140                 final Object decodedPayload =
141                         decodePayload(
142                                 ctx,
143                                 buffer,
144                                 mqttFixedHeader.messageType(),
145                                 maxClientIdLength,
146                                 variableHeader);
147                 checkpoint(DecoderState.READ_FIXED_HEADER);
148                 MqttMessage message = MqttMessageFactory.newMessage(
149                         mqttFixedHeader, variableHeader, decodedPayload);
150                 mqttFixedHeader = null;
151                 variableHeader = null;
152                 out.add(message);
153                 break;
154             } catch (Exception cause) {
155                 out.add(invalidMessage(cause));
156                 return;
157             }
158 
159             case BAD_MESSAGE:
160                 // Keep discarding until disconnection.
161                 buffer.skipBytes(actualReadableBytes());
162                 break;
163 
164             default:
165                 // Shouldn't reach here.
166                 throw new Error();
167         }
168     }
169 
170     private MqttMessage invalidMessage(Throwable cause) {
171       checkpoint(DecoderState.BAD_MESSAGE);
172       return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
173     }
174 
175     /**
176      * Decodes the fixed header. It's one byte for the flags and then variable
177      * bytes for the remaining length.
178      *
179      * @see
180      * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180841
181      *
182      * @param buffer the buffer to decode from
183      * @return the fixed header
184      */
185     private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
186         short b1 = buffer.readUnsignedByte();
187 
188         MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
189         boolean dupFlag = (b1 & 0x08) == 0x08;
190         int qosLevel = (b1 & 0x06) >> 1;
191         boolean retain = (b1 & 0x01) != 0;
192 
193         switch (messageType) {
194             case PUBLISH:
195                 if (qosLevel == 3) {
196                     throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
197                             + qosLevel + ')');
198                 }
199                 break;
200 
201             case PUBREL:
202             case SUBSCRIBE:
203             case UNSUBSCRIBE:
204                 if (dupFlag) {
205                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
206                             + " message, must be 0, found 1");
207                 }
208                 if (qosLevel != 1) {
209                     throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
210                             + " message, must be 1, found " + qosLevel);
211                 }
212                 if (retain) {
213                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
214                             + " message, must be 0, found 1");
215                 }
216                 break;
217 
218             case AUTH:
219             case CONNACK:
220             case CONNECT:
221             case DISCONNECT:
222             case PINGREQ:
223             case PINGRESP:
224             case PUBACK:
225             case PUBCOMP:
226             case PUBREC:
227             case SUBACK:
228             case UNSUBACK:
229                 if (dupFlag) {
230                     throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
231                             + " message, must be 0, found 1");
232                 }
233                 if (qosLevel != 0) {
234                     throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
235                             + " message, must be 0, found " + qosLevel);
236                 }
237                 if (retain) {
238                     throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
239                             + " message, must be 0, found 1");
240                 }
241                 break;
242             default:
243                 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
244         }
245 
246         int remainingLength = parseRemainingLength(buffer, messageType);
247         MqttFixedHeader decodedFixedHeader =
248                 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
249         return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
250     }
251 
252     private static int parseRemainingLength(ByteBuf buffer, MqttMessageType messageType) {
253         int remainingLength = 0;
254         int multiplier = 1;
255 
256         for (int i = 0; i < 4; i++) {
257             short digit = buffer.readUnsignedByte();
258             remainingLength += (digit & 127) * multiplier;
259 
260             if ((digit & 128) == 0) {
261                 return remainingLength;
262             }
263 
264             multiplier *= 128;
265         }
266 
267         // MQTT protocol limits Remaining Length to 4 bytes
268         throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
269     }
270 
271     /**
272      * Decodes the variable header (if any)
273      * @param buffer the buffer to decode from
274      * @param mqttFixedHeader MqttFixedHeader of the same message
275      * @return the variable header
276      */
277     private Object decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
278         switch (mqttFixedHeader.messageType()) {
279             case CONNECT:
280                 return decodeConnectionVariableHeader(ctx, buffer);
281 
282             case CONNACK:
283                 return decodeConnAckVariableHeader(ctx, buffer);
284 
285             case UNSUBSCRIBE:
286             case SUBSCRIBE:
287             case SUBACK:
288             case UNSUBACK:
289                 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
290 
291             case PUBACK:
292             case PUBREC:
293             case PUBCOMP:
294             case PUBREL:
295                 return decodePubReplyMessage(buffer);
296 
297             case PUBLISH:
298                 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
299 
300             case DISCONNECT:
301             case AUTH:
302                 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
303 
304             case PINGREQ:
305             case PINGRESP:
306                 // Empty variable header
307                 return null;
308             default:
309                 //shouldn't reach here
310                 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
311         }
312     }
313 
314     private MqttConnectVariableHeader decodeConnectionVariableHeader(
315             ChannelHandlerContext ctx,
316             ByteBuf buffer) {
317         final Result<String> protoString = decodeString(buffer);
318         int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
319 
320         final byte protocolLevel = buffer.readByte();
321         numberOfBytesConsumed += 1;
322 
323         MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
324         MqttCodecUtil.setMqttVersion(ctx, version);
325 
326         final int b1 = buffer.readUnsignedByte();
327         numberOfBytesConsumed += 1;
328 
329         final int keepAlive = decodeMsbLsb(buffer);
330         numberOfBytesConsumed += 2;
331 
332         final boolean hasUserName = (b1 & 0x80) == 0x80;
333         final boolean hasPassword = (b1 & 0x40) == 0x40;
334         final boolean willRetain = (b1 & 0x20) == 0x20;
335         final int willQos = (b1 & 0x18) >> 3;
336         final boolean willFlag = (b1 & 0x04) == 0x04;
337         final boolean cleanSession = (b1 & 0x02) == 0x02;
338         if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
339             final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
340             if (!zeroReservedFlag) {
341                 // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
342                 // set to zero and disconnect the Client if it is not zero.
343                 // See https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
344                 throw new DecoderException("non-zero reserved flag");
345             }
346         }
347 
348         final MqttProperties properties;
349         if (version == MqttVersion.MQTT_5) {
350             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
351             properties = propertiesResult.value;
352             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
353         } else {
354             properties = MqttProperties.NO_PROPERTIES;
355         }
356 
357         bytesRemainingInVariablePart -= numberOfBytesConsumed;
358         return new MqttConnectVariableHeader(
359                 version.protocolName(),
360                 version.protocolLevel(),
361                 hasUserName,
362                 hasPassword,
363                 willRetain,
364                 willQos,
365                 willFlag,
366                 cleanSession,
367                 keepAlive,
368                 properties);
369     }
370 
371     private MqttConnAckVariableHeader decodeConnAckVariableHeader(
372             ChannelHandlerContext ctx,
373             ByteBuf buffer) {
374         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
375         final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
376         byte returnCode = buffer.readByte();
377 
378         final MqttProperties properties;
379         if (mqttVersion == MqttVersion.MQTT_5) {
380             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
381             properties = propertiesResult.value;
382             bytesRemainingInVariablePart -= 2 + propertiesResult.numberOfBytesConsumed;
383         } else {
384             properties = MqttProperties.NO_PROPERTIES;
385             bytesRemainingInVariablePart -= 2;
386         }
387 
388         return new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
389     }
390 
391     private MqttMessageIdAndPropertiesVariableHeader decodeMessageIdAndPropertiesVariableHeader(
392             ChannelHandlerContext ctx,
393             ByteBuf buffer) {
394         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
395         final int packetId = decodeMessageId(buffer);
396 
397         if (mqttVersion == MqttVersion.MQTT_5) {
398             final Result<MqttProperties> properties = decodeProperties(buffer);
399             bytesRemainingInVariablePart -= 2 + properties.numberOfBytesConsumed;
400             return new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
401         } else {
402             bytesRemainingInVariablePart -= 2;
403             return new MqttMessageIdAndPropertiesVariableHeader(packetId,
404                                                                 MqttProperties.NO_PROPERTIES);
405         }
406     }
407 
408     private MqttPubReplyMessageVariableHeader decodePubReplyMessage(ByteBuf buffer) {
409         final int packetId = decodeMessageId(buffer);
410 
411         final int packetIdNumberOfBytesConsumed = 2;
412         if (bytesRemainingInVariablePart > 3) {
413             final byte reasonCode = buffer.readByte();
414             final Result<MqttProperties> properties = decodeProperties(buffer);
415             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
416             return new MqttPubReplyMessageVariableHeader(packetId,
417                     reasonCode,
418                     properties.value);
419         } else if (bytesRemainingInVariablePart > 2) {
420             final byte reasonCode = buffer.readByte();
421             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed + 1;
422             return new MqttPubReplyMessageVariableHeader(packetId,
423                     reasonCode,
424                     MqttProperties.NO_PROPERTIES);
425         } else {
426             bytesRemainingInVariablePart -= packetIdNumberOfBytesConsumed;
427             return new MqttPubReplyMessageVariableHeader(packetId,
428                     (byte) 0,
429                     MqttProperties.NO_PROPERTIES);
430         }
431     }
432 
433     private MqttReasonCodeAndPropertiesVariableHeader decodeReasonCodeAndPropertiesVariableHeader(
434             ByteBuf buffer) {
435         final byte reasonCode;
436         final MqttProperties properties;
437         if (bytesRemainingInVariablePart > 1) {
438             reasonCode = buffer.readByte();
439             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
440             properties = propertiesResult.value;
441             bytesRemainingInVariablePart -= 1 + propertiesResult.numberOfBytesConsumed;
442         } else if (bytesRemainingInVariablePart > 0) {
443             reasonCode = buffer.readByte();
444             properties = MqttProperties.NO_PROPERTIES;
445             --bytesRemainingInVariablePart;
446         } else {
447             reasonCode = 0;
448             properties = MqttProperties.NO_PROPERTIES;
449         }
450 
451         return new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
452     }
453 
454     private MqttPublishVariableHeader decodePublishVariableHeader(
455             ChannelHandlerContext ctx,
456             ByteBuf buffer,
457             MqttFixedHeader mqttFixedHeader) {
458         final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
459         final Result<String> decodedTopic = decodeString(buffer);
460         if (!isValidPublishTopicName(decodedTopic.value)) {
461             throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
462         }
463         int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
464 
465         int messageId = -1;
466         if (mqttFixedHeader.qosLevel().value() > 0) {
467             messageId = decodeMessageId(buffer);
468             numberOfBytesConsumed += 2;
469         }
470 
471         final MqttProperties properties;
472         if (mqttVersion == MqttVersion.MQTT_5) {
473             final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
474             properties = propertiesResult.value;
475             numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
476         } else {
477             properties = MqttProperties.NO_PROPERTIES;
478         }
479 
480         bytesRemainingInVariablePart -= numberOfBytesConsumed;
481         return new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
482     }
483 
484     /**
485      * @return messageId with numberOfBytesConsumed is 2
486      */
487     private static int decodeMessageId(ByteBuf buffer) {
488         final int messageId = decodeMsbLsb(buffer);
489         if (!isValidMessageId(messageId)) {
490             throw new DecoderException("invalid messageId: " + messageId);
491         }
492         return messageId;
493     }
494 
495     /**
496      * Decodes the payload.
497      *
498      * @param buffer the buffer to decode from
499      * @param messageType  type of the message being decoded
500      * @param variableHeader variable header of the same message
501      * @return the payload
502      */
503     private Object decodePayload(
504             ChannelHandlerContext ctx,
505             ByteBuf buffer,
506             MqttMessageType messageType,
507             int maxClientIdLength,
508             Object variableHeader) {
509         switch (messageType) {
510             case CONNECT:
511                 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
512 
513             case SUBSCRIBE:
514                 return decodeSubscribePayload(buffer);
515 
516             case SUBACK:
517                 return decodeSubackPayload(buffer);
518 
519             case UNSUBSCRIBE:
520                 return decodeUnsubscribePayload(buffer);
521 
522             case UNSUBACK:
523                 return decodeUnsubAckPayload(ctx, buffer);
524 
525             case PUBLISH:
526                 return decodePublishPayload(buffer);
527 
528             default:
529                 // unknown payload , no byte consumed
530                 return null;
531         }
532     }
533 
534     private MqttConnectPayload decodeConnectionPayload(
535             ByteBuf buffer,
536             int maxClientIdLength,
537             MqttConnectVariableHeader mqttConnectVariableHeader) {
538         final Result<String> decodedClientId = decodeString(buffer);
539         final String decodedClientIdValue = decodedClientId.value;
540         final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
541                 (byte) mqttConnectVariableHeader.version());
542         if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
543             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
544         }
545         int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
546 
547         Result<String> decodedWillTopic = null;
548         byte[] decodedWillMessage = null;
549 
550         final MqttProperties willProperties;
551         if (mqttConnectVariableHeader.isWillFlag()) {
552             if (mqttVersion == MqttVersion.MQTT_5) {
553                 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
554                 willProperties = propertiesResult.value;
555                 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
556             } else {
557                 willProperties = MqttProperties.NO_PROPERTIES;
558             }
559             decodedWillTopic = decodeString(buffer, 0, 32767);
560             numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
561             decodedWillMessage = decodeByteArray(buffer);
562             numberOfBytesConsumed += decodedWillMessage.length + 2;
563         } else {
564             willProperties = MqttProperties.NO_PROPERTIES;
565         }
566         Result<String> decodedUserName = null;
567         byte[] decodedPassword = null;
568         if (mqttConnectVariableHeader.hasUserName()) {
569             decodedUserName = decodeString(buffer);
570             numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
571         }
572         if (mqttConnectVariableHeader.hasPassword()) {
573             decodedPassword = decodeByteArray(buffer);
574             numberOfBytesConsumed += decodedPassword.length + 2;
575         }
576 
577         validateNoBytesRemain(numberOfBytesConsumed);
578         return new MqttConnectPayload(
579                         decodedClientId.value,
580                         willProperties,
581                         decodedWillTopic != null ? decodedWillTopic.value : null,
582                         decodedWillMessage,
583                         decodedUserName != null ? decodedUserName.value : null,
584                         decodedPassword);
585     }
586 
587     private MqttSubscribePayload decodeSubscribePayload(
588             ByteBuf buffer) {
589         final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
590         int numberOfBytesConsumed = 0;
591         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
592             final Result<String> decodedTopicName = decodeString(buffer);
593             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
594             //See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details
595             final short optionByte = buffer.readUnsignedByte();
596 
597             MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
598             boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
599             boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
600             RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
601 
602             final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
603                     noLocal,
604                     retainAsPublished,
605                     retainHandling);
606 
607             numberOfBytesConsumed++;
608             subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
609         }
610         validateNoBytesRemain(numberOfBytesConsumed);
611         return new MqttSubscribePayload(subscribeTopics);
612     }
613 
614     private MqttSubAckPayload decodeSubackPayload(
615             ByteBuf buffer) {
616         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
617         final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
618         int numberOfBytesConsumed = 0;
619         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
620             int reasonCode = buffer.readUnsignedByte();
621             numberOfBytesConsumed++;
622             grantedQos.add(reasonCode);
623         }
624         validateNoBytesRemain(numberOfBytesConsumed);
625         return new MqttSubAckPayload(grantedQos);
626     }
627 
628     private MqttUnsubAckPayload decodeUnsubAckPayload(
629         ChannelHandlerContext ctx,
630         ByteBuf buffer) {
631         int bytesRemainingInVariablePart = this.bytesRemainingInVariablePart;
632         final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
633         int numberOfBytesConsumed = 0;
634         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
635             short reasonCode = buffer.readUnsignedByte();
636             numberOfBytesConsumed++;
637             reasonCodes.add(reasonCode);
638         }
639         validateNoBytesRemain(numberOfBytesConsumed);
640         return new MqttUnsubAckPayload(reasonCodes);
641     }
642 
643     private MqttUnsubscribePayload decodeUnsubscribePayload(
644             ByteBuf buffer) {
645         final List<String> unsubscribeTopics = new ArrayList<String>();
646         int numberOfBytesConsumed = 0;
647         while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
648             final Result<String> decodedTopicName = decodeString(buffer);
649             numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
650             unsubscribeTopics.add(decodedTopicName.value);
651         }
652         validateNoBytesRemain(numberOfBytesConsumed);
653         return new MqttUnsubscribePayload(unsubscribeTopics);
654     }
655 
656     private ByteBuf decodePublishPayload(ByteBuf buffer) {
657         return buffer.readRetainedSlice(bytesRemainingInVariablePart);
658     }
659 
660     private void validateNoBytesRemain(int numberOfBytesConsumed) {
661         bytesRemainingInVariablePart -= numberOfBytesConsumed;
662         if (bytesRemainingInVariablePart != 0) {
663             throw new DecoderException(
664                     "non-zero remaining payload bytes: " +
665                     bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
666         }
667     }
668 
669     private static Result<String> decodeString(ByteBuf buffer) {
670         return decodeString(buffer, 0, Integer.MAX_VALUE);
671     }
672 
673     private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
674         int size = decodeMsbLsb(buffer);
675         int numberOfBytesConsumed = 2;
676         if (size < minBytes || size > maxBytes) {
677             buffer.skipBytes(size);
678             numberOfBytesConsumed += size;
679             return new Result<String>(null, numberOfBytesConsumed);
680         }
681         String s = buffer.readString(size, CharsetUtil.UTF_8);
682         numberOfBytesConsumed += size;
683         return new Result<String>(s, numberOfBytesConsumed);
684     }
685 
686     /**
687      *
688      * @return the decoded byte[], numberOfBytesConsumed = byte[].length + 2
689      */
690     private static byte[] decodeByteArray(ByteBuf buffer) {
691         int size = decodeMsbLsb(buffer);
692         byte[] bytes = new byte[size];
693         buffer.readBytes(bytes);
694         return bytes;
695     }
696 
697     // packing utils to reduce the amount of garbage while decoding ints
698     private static long packInts(int a, int b) {
699         return (((long) a) << 32) | (b & 0xFFFFFFFFL);
700     }
701 
702     private static int unpackA(long ints) {
703         return (int) (ints >> 32);
704     }
705 
706     private static int unpackB(long ints) {
707         return (int) ints;
708     }
709 
710     /**
711      *  numberOfBytesConsumed = 2. return decoded result.
712      */
713     private static int decodeMsbLsb(ByteBuf buffer) {
714         int min = 0;
715         int max = 65535;
716         short msbSize = buffer.readUnsignedByte();
717         short lsbSize = buffer.readUnsignedByte();
718         int result = msbSize << 8 | lsbSize;
719         if (result < min || result > max) {
720             result = -1;
721         }
722         return result;
723     }
724 
725     /**
726      * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
727      *
728      * @param buffer the buffer to decode from
729      * @return result pack with a = decoded integer, b = numberOfBytesConsumed. Need to unpack to read them.
730      * @throws DecoderException if bad MQTT protocol limits Remaining Length
731      */
732     private static long decodeVariableByteInteger(ByteBuf buffer) {
733         int remainingLength = 0;
734         int multiplier = 1;
735 
736         for (int i = 0; i < 4; i++) {
737             short digit = buffer.readUnsignedByte();
738             remainingLength += (digit & 127) * multiplier;
739 
740             if ((digit & 128) == 0) {
741                 return packInts(remainingLength, i + 1);
742             }
743 
744             multiplier *= 128;
745         }
746 
747         throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
748     }
749 
750     private static final class Result<T> {
751 
752         private final T value;
753         private final int numberOfBytesConsumed;
754 
755         Result(T value, int numberOfBytesConsumed) {
756             this.value = value;
757             this.numberOfBytesConsumed = numberOfBytesConsumed;
758         }
759     }
760 
761     private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
762         final long propertiesLength = decodeVariableByteInteger(buffer);
763         int totalPropertiesLength = unpackA(propertiesLength);
764         int numberOfBytesConsumed = unpackB(propertiesLength);
765 
766         MqttProperties decodedProperties = new MqttProperties();
767         while (numberOfBytesConsumed < totalPropertiesLength) {
768             long propertyId = decodeVariableByteInteger(buffer);
769             final int propertyIdValue = unpackA(propertyId);
770             numberOfBytesConsumed += unpackB(propertyId);
771             switch (propertyIdValue) {
772                 case PAYLOAD_FORMAT_INDICATOR:
773                 case REQUEST_PROBLEM_INFORMATION:
774                 case REQUEST_RESPONSE_INFORMATION:
775                 case MAXIMUM_QOS:
776                 case RETAIN_AVAILABLE:
777                 case WILDCARD_SUBSCRIPTION_AVAILABLE:
778                 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
779                 case SHARED_SUBSCRIPTION_AVAILABLE:
780                     final int b1 = buffer.readUnsignedByte();
781                     numberOfBytesConsumed++;
782                     decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
783                     break;
784                 case SERVER_KEEP_ALIVE:
785                 case RECEIVE_MAXIMUM:
786                 case TOPIC_ALIAS_MAXIMUM:
787                 case TOPIC_ALIAS:
788                     final int int2BytesResult = decodeMsbLsb(buffer);
789                     numberOfBytesConsumed += 2;
790                     decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
791                     break;
792                 case PUBLICATION_EXPIRY_INTERVAL:
793                 case SESSION_EXPIRY_INTERVAL:
794                 case WILL_DELAY_INTERVAL:
795                 case MAXIMUM_PACKET_SIZE:
796                     final int maxPacketSize = buffer.readInt();
797                     numberOfBytesConsumed += 4;
798                     decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
799                     break;
800                 case SUBSCRIPTION_IDENTIFIER:
801                     long vbIntegerResult = decodeVariableByteInteger(buffer);
802                     numberOfBytesConsumed += unpackB(vbIntegerResult);
803                     decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
804                     break;
805                 case CONTENT_TYPE:
806                 case RESPONSE_TOPIC:
807                 case ASSIGNED_CLIENT_IDENTIFIER:
808                 case AUTHENTICATION_METHOD:
809                 case RESPONSE_INFORMATION:
810                 case SERVER_REFERENCE:
811                 case REASON_STRING:
812                     final Result<String> stringResult = decodeString(buffer);
813                     numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
814                     decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
815                     break;
816                 case USER_PROPERTY:
817                     final Result<String> keyResult = decodeString(buffer);
818                     final Result<String> valueResult = decodeString(buffer);
819                     numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
820                     numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
821                     decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
822                     break;
823                 case CORRELATION_DATA:
824                 case AUTHENTICATION_DATA:
825                     final byte[] binaryDataResult = decodeByteArray(buffer);
826                     numberOfBytesConsumed += binaryDataResult.length + 2;
827                     decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
828                     break;
829                 default:
830                     //shouldn't reach here
831                     throw new DecoderException("Unknown property type: " + propertyIdValue);
832             }
833         }
834 
835         return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
836     }
837 }