View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.handler.codec.EncoderException;
25  import io.netty.handler.codec.MessageToMessageEncoder;
26  import io.netty.util.internal.EmptyArrays;
27  
28  import java.util.List;
29  
30  import static io.netty.buffer.ByteBufUtil.*;
31  import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
32  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
34  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
35  import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
36  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
37  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
38  import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
39  import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
40  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
41  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
42  import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
43  import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
44  import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
45  import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
46  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
47  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
48  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
49  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
50  import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
51  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
52  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
53  import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
54  import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
55  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
56  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
57  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
58  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
59  import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
60  import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
61  import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
62  
63  /**
64   * Encodes Mqtt messages into bytes following the protocol specification v3.1
65   * as described here <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
66   * or v5.0 as described here <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">MQTTv5.0</a> -
67   * depending on the version specified in the first CONNECT message that goes through the channel.
68   */
69  @ChannelHandler.Sharable
70  public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
71  
72      public static final MqttEncoder INSTANCE = new MqttEncoder();
73  
74      private MqttEncoder() {
75          super(MqttMessage.class);
76      }
77  
78      @Override
79      protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
80          out.add(doEncode(ctx, msg));
81      }
82  
83      /**
84       * This is the main encoding method.
85       * It's only visible for testing.
86       *
87       * @param message MQTT message to encode
88       * @return ByteBuf with encoded bytes
89       */
90      static ByteBuf doEncode(ChannelHandlerContext ctx,
91                       MqttMessage message) {
92  
93          switch (message.fixedHeader().messageType()) {
94              case CONNECT:
95                  return encodeConnectMessage(ctx, (MqttConnectMessage) message);
96  
97              case CONNACK:
98                  return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
99  
100             case PUBLISH:
101                 return encodePublishMessage(ctx, (MqttPublishMessage) message);
102 
103             case SUBSCRIBE:
104                 return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
105 
106             case UNSUBSCRIBE:
107                 return encodeUnsubscribeMessage(ctx,  (MqttUnsubscribeMessage) message);
108 
109             case SUBACK:
110                 return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
111 
112             case UNSUBACK:
113                 if (message instanceof MqttUnsubAckMessage) {
114                     return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
115                 }
116                 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
117 
118             case PUBACK:
119             case PUBREC:
120             case PUBREL:
121             case PUBCOMP:
122                 return encodePubReplyMessage(ctx, message);
123 
124             case DISCONNECT:
125             case AUTH:
126                 return encodeReasonCodePlusPropertiesMessage(ctx, message);
127 
128             case PINGREQ:
129             case PINGRESP:
130                 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
131 
132             default:
133                 throw new IllegalArgumentException(
134                         "Unknown message type: " + message.fixedHeader().messageType().value());
135         }
136     }
137 
138     private static ByteBuf encodeConnectMessage(
139             ChannelHandlerContext ctx,
140             MqttConnectMessage message) {
141         int payloadBufferSize = 0;
142 
143         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
144         MqttConnectVariableHeader variableHeader = message.variableHeader();
145         MqttConnectPayload payload = message.payload();
146         MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
147                 (byte) variableHeader.version());
148         setMqttVersion(ctx, mqttVersion);
149 
150         // MQTT 3.1 and 3.1.1 require the Password Flag to be 0 when the User Name Flag is 0.
151         if ((mqttVersion == MqttVersion.MQTT_3_1 || mqttVersion == MqttVersion.MQTT_3_1_1) &&
152                 !variableHeader.hasUserName() && variableHeader.hasPassword()) {
153             throw new EncoderException("Without a username, the password MUST be not set");
154         }
155 
156         // Client id
157         String clientIdentifier = payload.clientIdentifier();
158         if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
159             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
160         }
161         int clientIdentifierBytes = utf8Bytes(clientIdentifier);
162         payloadBufferSize += 2 + clientIdentifierBytes;
163 
164         // Will topic and message
165         String willTopic = payload.willTopic();
166         int willTopicBytes = nullableUtf8Bytes(willTopic);
167         byte[] willMessage = payload.willMessageInBytes();
168         byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
169         if (variableHeader.isWillFlag()) {
170             payloadBufferSize += 2 + willTopicBytes;
171             payloadBufferSize += 2 + willMessageBytes.length;
172         }
173 
174         String userName = payload.userName();
175         int userNameBytes = nullableUtf8Bytes(userName);
176         if (variableHeader.hasUserName()) {
177             payloadBufferSize += 2 + userNameBytes;
178         }
179 
180         byte[] password = payload.passwordInBytes();
181         byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
182         if (variableHeader.hasPassword()) {
183             payloadBufferSize += 2 + passwordBytes.length;
184         }
185 
186         // Fixed and variable header
187         byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
188         ByteBuf propertiesBuf = encodePropertiesIfNeeded(
189                 mqttVersion,
190                 ctx.alloc(),
191                 message.variableHeader().properties());
192         try {
193             final ByteBuf willPropertiesBuf;
194             if (variableHeader.isWillFlag()) {
195                 willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
196                 payloadBufferSize += willPropertiesBuf.readableBytes();
197             } else {
198                 willPropertiesBuf = Unpooled.EMPTY_BUFFER;
199             }
200             try {
201                 int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
202 
203                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
204                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
205                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
206                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
207                 writeVariableLengthInt(buf, variablePartSize);
208 
209                 buf.writeShort(protocolNameBytes.length);
210                 buf.writeBytes(protocolNameBytes);
211 
212                 buf.writeByte(variableHeader.version());
213                 buf.writeByte(getConnVariableHeaderFlag(variableHeader));
214                 buf.writeShort(variableHeader.keepAliveTimeSeconds());
215                 buf.writeBytes(propertiesBuf);
216 
217                 // Payload
218                 writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
219                 if (variableHeader.isWillFlag()) {
220                     buf.writeBytes(willPropertiesBuf);
221                     writeExactUTF8String(buf, willTopic, willTopicBytes);
222                     buf.writeShort(willMessageBytes.length);
223                     buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
224                 }
225                 if (variableHeader.hasUserName()) {
226                     writeExactUTF8String(buf, userName, userNameBytes);
227                 }
228                 if (variableHeader.hasPassword()) {
229                     buf.writeShort(passwordBytes.length);
230                     buf.writeBytes(passwordBytes, 0, passwordBytes.length);
231                 }
232                 return buf;
233             } finally {
234                 willPropertiesBuf.release();
235             }
236         } finally {
237             propertiesBuf.release();
238         }
239     }
240 
241     private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
242         int flagByte = 0;
243         if (variableHeader.hasUserName()) {
244             flagByte |= 0x80;
245         }
246         if (variableHeader.hasPassword()) {
247             flagByte |= 0x40;
248         }
249         if (variableHeader.isWillRetain()) {
250             flagByte |= 0x20;
251         }
252         flagByte |= (variableHeader.willQos() & 0x03) << 3;
253         if (variableHeader.isWillFlag()) {
254             flagByte |= 0x04;
255         }
256         if (variableHeader.isCleanSession()) {
257             flagByte |= 0x02;
258         }
259         return flagByte;
260     }
261 
262     private static ByteBuf encodeConnAckMessage(
263             ChannelHandlerContext ctx,
264             MqttConnAckMessage message) {
265         final MqttVersion mqttVersion = getMqttVersion(ctx);
266         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
267                 ctx.alloc(),
268                 message.variableHeader().properties());
269 
270         try {
271             ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
272             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
273             writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
274             buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
275             buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
276             buf.writeBytes(propertiesBuf);
277             return buf;
278         } finally {
279             propertiesBuf.release();
280         }
281     }
282 
283     private static ByteBuf encodeSubscribeMessage(
284             ChannelHandlerContext ctx,
285             MqttSubscribeMessage message) {
286         MqttVersion mqttVersion = getMqttVersion(ctx);
287         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
288                 ctx.alloc(),
289                 message.idAndPropertiesVariableHeader().properties());
290 
291         try {
292             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
293             int payloadBufferSize = 0;
294 
295             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
296             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
297             MqttSubscribePayload payload = message.payload();
298 
299             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
300                 String topicName = topic.topicName();
301                 int topicNameBytes = utf8Bytes(topicName);
302                 payloadBufferSize += 2 + topicNameBytes;
303                 payloadBufferSize += 1;
304             }
305 
306             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
307             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
308 
309             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
310             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
311             writeVariableLengthInt(buf, variablePartSize);
312 
313             // Variable Header
314             int messageId = variableHeader.messageId();
315             buf.writeShort(messageId);
316             buf.writeBytes(propertiesBuf);
317 
318             // Payload
319             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
320                 writeEagerUTF8String(buf, topic.topicName());
321                 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) {
322                     buf.writeByte(topic.qualityOfService().value());
323                 } else {
324                     final MqttSubscriptionOption option = topic.option();
325 
326                     int optionEncoded = option.retainHandling().value() << 4;
327                     if (option.isRetainAsPublished()) {
328                         optionEncoded |= 0x08;
329                     }
330                     if (option.isNoLocal()) {
331                         optionEncoded |= 0x04;
332                     }
333                     optionEncoded |= option.qos().value();
334 
335                     buf.writeByte(optionEncoded);
336                 }
337             }
338 
339             return buf;
340         } finally {
341             propertiesBuf.release();
342         }
343     }
344 
345     private static ByteBuf encodeUnsubscribeMessage(
346             ChannelHandlerContext ctx,
347             MqttUnsubscribeMessage message) {
348         MqttVersion mqttVersion = getMqttVersion(ctx);
349         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
350                 ctx.alloc(),
351                 message.idAndPropertiesVariableHeader().properties());
352 
353         try {
354             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
355             int payloadBufferSize = 0;
356 
357             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
358             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
359             MqttUnsubscribePayload payload = message.payload();
360 
361             for (String topicName : payload.topics()) {
362                 int topicNameBytes = utf8Bytes(topicName);
363                 payloadBufferSize += 2 + topicNameBytes;
364             }
365 
366             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
367             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
368 
369             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
370             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
371             writeVariableLengthInt(buf, variablePartSize);
372 
373             // Variable Header
374             int messageId = variableHeader.messageId();
375             buf.writeShort(messageId);
376             buf.writeBytes(propertiesBuf);
377 
378             // Payload
379             for (String topicName : payload.topics()) {
380                 writeEagerUTF8String(buf, topicName);
381             }
382 
383             return buf;
384         } finally {
385             propertiesBuf.release();
386         }
387     }
388 
389     private static ByteBuf encodeSubAckMessage(
390             ChannelHandlerContext ctx,
391             MqttSubAckMessage message) {
392         MqttVersion mqttVersion = getMqttVersion(ctx);
393         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
394                 ctx.alloc(),
395                 message.idAndPropertiesVariableHeader().properties());
396         try {
397             int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
398             int payloadBufferSize = message.payload().grantedQoSLevels().size();
399             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
400             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
401             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
402             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
403             writeVariableLengthInt(buf, variablePartSize);
404             buf.writeShort(message.variableHeader().messageId());
405             buf.writeBytes(propertiesBuf);
406             for (int code: message.payload().reasonCodes()) {
407                 buf.writeByte(code);
408             }
409 
410             return buf;
411         } finally {
412             propertiesBuf.release();
413         }
414     }
415 
416     private static ByteBuf encodeUnsubAckMessage(
417             ChannelHandlerContext ctx,
418             MqttUnsubAckMessage message) {
419         if (message.variableHeader() instanceof  MqttMessageIdAndPropertiesVariableHeader) {
420             MqttVersion mqttVersion = getMqttVersion(ctx);
421             ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
422                     ctx.alloc(),
423                     message.idAndPropertiesVariableHeader().properties());
424             try {
425                 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
426                 MqttUnsubAckPayload payload = message.payload();
427                 int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
428                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
429                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
430                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
431                 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
432                 writeVariableLengthInt(buf, variablePartSize);
433                 buf.writeShort(message.variableHeader().messageId());
434                 buf.writeBytes(propertiesBuf);
435 
436                 if (payload != null) {
437                     for (Short reasonCode : payload.unsubscribeReasonCodes()) {
438                         buf.writeByte(reasonCode);
439                     }
440                 }
441 
442                 return buf;
443             } finally {
444                 propertiesBuf.release();
445             }
446         } else {
447             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
448         }
449     }
450 
451     private static ByteBuf encodePublishMessage(
452             ChannelHandlerContext ctx,
453             MqttPublishMessage message) {
454         MqttVersion mqttVersion = getMqttVersion(ctx);
455         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
456         MqttPublishVariableHeader variableHeader = message.variableHeader();
457         ByteBuf payload = message.payload().duplicate();
458 
459         String topicName = variableHeader.topicName();
460         int topicNameBytes = utf8Bytes(topicName);
461 
462         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
463                 ctx.alloc(),
464                 message.variableHeader().properties());
465 
466         try {
467             boolean qosLevelGreaterZero = mqttFixedHeader.qosLevel().value() > 0;
468             int variableHeaderBufferSize = 2 + topicNameBytes +
469                     (qosLevelGreaterZero ? 2 : 0) + propertiesBuf.readableBytes();
470             int payloadBufferSize = payload.readableBytes();
471             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
472             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
473 
474             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
475             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
476             writeVariableLengthInt(buf, variablePartSize);
477             writeExactUTF8String(buf, topicName, topicNameBytes);
478             if (qosLevelGreaterZero) {
479                 buf.writeShort(variableHeader.packetId());
480             }
481             buf.writeBytes(propertiesBuf);
482             buf.writeBytes(payload);
483 
484             return buf;
485         } finally {
486             propertiesBuf.release();
487         }
488     }
489 
490     private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
491                                           MqttMessage message) {
492         if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
493             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
494             MqttPubReplyMessageVariableHeader variableHeader =
495                     (MqttPubReplyMessageVariableHeader) message.variableHeader();
496             int msgId = variableHeader.messageId();
497 
498             final ByteBuf propertiesBuf;
499             final boolean includeReasonCode;
500             final int variableHeaderBufferSize;
501             final MqttVersion mqttVersion = getMqttVersion(ctx);
502             if (mqttVersion == MqttVersion.MQTT_5 &&
503                     (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
504                             !variableHeader.properties().isEmpty())) {
505                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
506                 includeReasonCode = true;
507                 variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
508             } else {
509                 propertiesBuf = Unpooled.EMPTY_BUFFER;
510                 includeReasonCode = false;
511                 variableHeaderBufferSize = 2;
512             }
513 
514             try {
515                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
516                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
517                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
518                 writeVariableLengthInt(buf, variableHeaderBufferSize);
519                 buf.writeShort(msgId);
520                 if (includeReasonCode) {
521                     buf.writeByte(variableHeader.reasonCode());
522                 }
523                 buf.writeBytes(propertiesBuf);
524 
525                 return buf;
526             } finally {
527                 propertiesBuf.release();
528             }
529         } else {
530             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
531         }
532     }
533 
534     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
535             ByteBufAllocator byteBufAllocator,
536             MqttMessage message) {
537         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
538         MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
539         int msgId = variableHeader.messageId();
540 
541         int variableHeaderBufferSize = 2; // variable part only has a message id
542         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
543         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
544         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
545         writeVariableLengthInt(buf, variableHeaderBufferSize);
546         buf.writeShort(msgId);
547 
548         return buf;
549     }
550 
551     private static ByteBuf encodeReasonCodePlusPropertiesMessage(
552             ChannelHandlerContext ctx,
553             MqttMessage message) {
554         if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
555             MqttVersion mqttVersion = getMqttVersion(ctx);
556             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
557             MqttReasonCodeAndPropertiesVariableHeader variableHeader =
558                     (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
559 
560             final ByteBuf propertiesBuf;
561             final boolean includeReasonCode;
562             final int variableHeaderBufferSize;
563             if (mqttVersion == MqttVersion.MQTT_5 &&
564                     (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
565                             !variableHeader.properties().isEmpty())) {
566                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
567                 includeReasonCode = true;
568                 variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
569             } else {
570                 propertiesBuf = Unpooled.EMPTY_BUFFER;
571                 includeReasonCode = false;
572                 variableHeaderBufferSize = 0;
573             }
574 
575             try {
576                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
577                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
578                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
579                 writeVariableLengthInt(buf, variableHeaderBufferSize);
580                 if (includeReasonCode) {
581                     buf.writeByte(variableHeader.reasonCode());
582                 }
583                 buf.writeBytes(propertiesBuf);
584 
585                 return buf;
586             } finally {
587                 propertiesBuf.release();
588             }
589         } else {
590             return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
591         }
592     }
593 
594     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
595             ByteBufAllocator byteBufAllocator,
596             MqttMessage message) {
597         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
598         ByteBuf buf = byteBufAllocator.buffer(2);
599         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
600         buf.writeByte(0);
601 
602         return buf;
603     }
604 
605     private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
606                                              ByteBufAllocator byteBufAllocator,
607                                              MqttProperties mqttProperties) {
608         if (mqttVersion == MqttVersion.MQTT_5) {
609             return encodeProperties(byteBufAllocator, mqttProperties);
610         }
611         return Unpooled.EMPTY_BUFFER;
612     }
613 
614     private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
615                                             MqttProperties mqttProperties) {
616         ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
617         // encode also the Properties part
618         try {
619             ByteBuf propertiesBuf = byteBufAllocator.buffer();
620             try {
621                 for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
622                     int propertyId = property.propertyId;
623                     switch (propertyId) {
624                         case PAYLOAD_FORMAT_INDICATOR:
625                         case REQUEST_PROBLEM_INFORMATION:
626                         case REQUEST_RESPONSE_INFORMATION:
627                         case MAXIMUM_QOS:
628                         case RETAIN_AVAILABLE:
629                         case WILDCARD_SUBSCRIPTION_AVAILABLE:
630                         case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
631                         case SHARED_SUBSCRIPTION_AVAILABLE:
632                             writeVariableLengthInt(propertiesBuf, propertyId);
633                             final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
634                             propertiesBuf.writeByte(bytePropValue);
635                             break;
636                         case SERVER_KEEP_ALIVE:
637                         case RECEIVE_MAXIMUM:
638                         case TOPIC_ALIAS_MAXIMUM:
639                         case TOPIC_ALIAS:
640                             writeVariableLengthInt(propertiesBuf, propertyId);
641                             final short twoBytesInPropValue =
642                                     ((MqttProperties.IntegerProperty) property).value.shortValue();
643                             propertiesBuf.writeShort(twoBytesInPropValue);
644                             break;
645                         case PUBLICATION_EXPIRY_INTERVAL:
646                         case SESSION_EXPIRY_INTERVAL:
647                         case WILL_DELAY_INTERVAL:
648                         case MAXIMUM_PACKET_SIZE:
649                             writeVariableLengthInt(propertiesBuf, propertyId);
650                             final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
651                             propertiesBuf.writeInt(fourBytesIntPropValue);
652                             break;
653                         case SUBSCRIPTION_IDENTIFIER:
654                             writeVariableLengthInt(propertiesBuf, propertyId);
655                             final int vbi = ((MqttProperties.IntegerProperty) property).value;
656                             writeVariableLengthInt(propertiesBuf, vbi);
657                             break;
658                         case CONTENT_TYPE:
659                         case RESPONSE_TOPIC:
660                         case ASSIGNED_CLIENT_IDENTIFIER:
661                         case AUTHENTICATION_METHOD:
662                         case RESPONSE_INFORMATION:
663                         case SERVER_REFERENCE:
664                         case REASON_STRING:
665                             writeVariableLengthInt(propertiesBuf, propertyId);
666                             writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
667                             break;
668                         case USER_PROPERTY:
669                             final List<MqttProperties.StringPair> pairs =
670                                     ((MqttProperties.UserProperties) property).value;
671                             for (MqttProperties.StringPair pair : pairs) {
672                                 writeVariableLengthInt(propertiesBuf, propertyId);
673                                 writeEagerUTF8String(propertiesBuf, pair.key);
674                                 writeEagerUTF8String(propertiesBuf, pair.value);
675                             }
676                             break;
677                         case CORRELATION_DATA:
678                         case AUTHENTICATION_DATA:
679                             writeVariableLengthInt(propertiesBuf, propertyId);
680                             final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
681                             propertiesBuf.writeShort(binaryPropValue.length);
682                             propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
683                             break;
684                         default:
685                             //shouldn't reach here
686                             throw new EncoderException("Unknown property type: " + propertyId);
687                     }
688                 }
689                 writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
690                 propertiesHeaderBuf.writeBytes(propertiesBuf);
691 
692                 return propertiesHeaderBuf;
693             } finally {
694                 propertiesBuf.release();
695             }
696         } catch (RuntimeException e) {
697             propertiesHeaderBuf.release();
698             throw e;
699         }
700     }
701 
702     private static int getFixedHeaderByte1(MqttFixedHeader header) {
703         int ret = 0;
704         ret |= header.messageType().value() << 4;
705         if (header.isDup()) {
706             ret |= 0x08;
707         }
708         ret |= header.qosLevel().value() << 1;
709         if (header.isRetain()) {
710             ret |= 0x01;
711         }
712         return ret;
713     }
714 
715     private static void writeVariableLengthInt(ByteBuf buf, int num) {
716         do {
717             int digit = num & 0x7F;
718             num >>>= 7;
719             if (num > 0) {
720                 digit |= 0x80;
721             }
722             buf.writeByte(digit);
723         } while (num > 0);
724     }
725 
726     private static int nullableUtf8Bytes(String s) {
727         return s == null? 0 : utf8Bytes(s);
728     }
729 
730     private static int nullableMaxUtf8Bytes(String s) {
731         return s == null? 0 : utf8MaxBytes(s);
732     }
733 
734     private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
735         buf.ensureWritable(utf8Length + 2);
736         buf.writeShort(utf8Length);
737         if (utf8Length > 0) {
738             final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
739             assert writtenUtf8Length == utf8Length;
740         }
741     }
742 
743     private static void writeEagerUTF8String(ByteBuf buf, String s) {
744         final int maxUtf8Length = nullableMaxUtf8Bytes(s);
745         buf.ensureWritable(maxUtf8Length + 2);
746         final int writerIndex = buf.writerIndex();
747         final int startUtf8String = writerIndex + 2;
748         buf.writerIndex(startUtf8String);
749         final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
750         buf.setShort(writerIndex, utf8Length);
751     }
752 
753     private static int getVariableLengthInt(int num) {
754         if (num < 128) {
755             return 1;
756         }
757         if (num < 16_384) { // 128 * 128
758             return 2;
759         }
760         if (num < 2_097_152) { // 128 * 128 * 128
761             return 3;
762         }
763         return 4;
764     }
765 
766 }