View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.handler.codec.EncoderException;
25  import io.netty.handler.codec.MessageToMessageEncoder;
26  import io.netty.util.internal.EmptyArrays;
27  
28  import java.util.List;
29  
30  import static io.netty.buffer.ByteBufUtil.*;
31  import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
32  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
34  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
35  import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
36  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
37  import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
38  import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
39  import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
40  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
41  import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
42  import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
43  import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
44  import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
45  import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
46  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
47  import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
48  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
49  import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
50  import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
51  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
52  import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
53  import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
54  import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
55  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
56  import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
57  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
58  import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
59  import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
60  import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
61  import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
62  
63  /**
64   * Encodes Mqtt messages into bytes following the protocol specification v3.1
65   * as described here <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
66   * or v5.0 as described here <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">MQTTv5.0</a> -
67   * depending on the version specified in the first CONNECT message that goes through the channel.
68   */
69  @ChannelHandler.Sharable
70  public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
71  
72      public static final MqttEncoder INSTANCE = new MqttEncoder();
73  
74      private MqttEncoder() {
75          super(MqttMessage.class);
76      }
77  
78      @Override
79      protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
80          out.add(doEncode(ctx, msg));
81      }
82  
83      /**
84       * This is the main encoding method.
85       * It's only visible for testing.
86       *
87       * @param message MQTT message to encode
88       * @return ByteBuf with encoded bytes
89       */
90      static ByteBuf doEncode(ChannelHandlerContext ctx,
91                       MqttMessage message) {
92  
93          switch (message.fixedHeader().messageType()) {
94              case CONNECT:
95                  return encodeConnectMessage(ctx, (MqttConnectMessage) message);
96  
97              case CONNACK:
98                  return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
99  
100             case PUBLISH:
101                 return encodePublishMessage(ctx, (MqttPublishMessage) message);
102 
103             case SUBSCRIBE:
104                 return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
105 
106             case UNSUBSCRIBE:
107                 return encodeUnsubscribeMessage(ctx,  (MqttUnsubscribeMessage) message);
108 
109             case SUBACK:
110                 return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
111 
112             case UNSUBACK:
113                 if (message instanceof MqttUnsubAckMessage) {
114                     return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
115                 }
116                 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
117 
118             case PUBACK:
119             case PUBREC:
120             case PUBREL:
121             case PUBCOMP:
122                 return encodePubReplyMessage(ctx, message);
123 
124             case DISCONNECT:
125             case AUTH:
126                 return encodeReasonCodePlusPropertiesMessage(ctx, message);
127 
128             case PINGREQ:
129             case PINGRESP:
130                 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
131 
132             default:
133                 throw new IllegalArgumentException(
134                         "Unknown message type: " + message.fixedHeader().messageType().value());
135         }
136     }
137 
138     private static ByteBuf encodeConnectMessage(
139             ChannelHandlerContext ctx,
140             MqttConnectMessage message) {
141         int payloadBufferSize = 0;
142 
143         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
144         MqttConnectVariableHeader variableHeader = message.variableHeader();
145         MqttConnectPayload payload = message.payload();
146         MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
147                 (byte) variableHeader.version());
148         setMqttVersion(ctx, mqttVersion);
149 
150         // as MQTT 3.1 & 3.1.1 spec, If the User Name Flag is set to 0, the Password Flag MUST be set to 0
151         if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
152             throw new EncoderException("Without a username, the password MUST be not set");
153         }
154 
155         // Client id
156         String clientIdentifier = payload.clientIdentifier();
157         if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
158             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
159         }
160         int clientIdentifierBytes = utf8Bytes(clientIdentifier);
161         payloadBufferSize += 2 + clientIdentifierBytes;
162 
163         // Will topic and message
164         String willTopic = payload.willTopic();
165         int willTopicBytes = nullableUtf8Bytes(willTopic);
166         byte[] willMessage = payload.willMessageInBytes();
167         byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
168         if (variableHeader.isWillFlag()) {
169             payloadBufferSize += 2 + willTopicBytes;
170             payloadBufferSize += 2 + willMessageBytes.length;
171         }
172 
173         String userName = payload.userName();
174         int userNameBytes = nullableUtf8Bytes(userName);
175         if (variableHeader.hasUserName()) {
176             payloadBufferSize += 2 + userNameBytes;
177         }
178 
179         byte[] password = payload.passwordInBytes();
180         byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
181         if (variableHeader.hasPassword()) {
182             payloadBufferSize += 2 + passwordBytes.length;
183         }
184 
185         // Fixed and variable header
186         byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
187         ByteBuf propertiesBuf = encodePropertiesIfNeeded(
188                 mqttVersion,
189                 ctx.alloc(),
190                 message.variableHeader().properties());
191         try {
192             final ByteBuf willPropertiesBuf;
193             if (variableHeader.isWillFlag()) {
194                 willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
195                 payloadBufferSize += willPropertiesBuf.readableBytes();
196             } else {
197                 willPropertiesBuf = Unpooled.EMPTY_BUFFER;
198             }
199             try {
200                 int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
201 
202                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
203                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
204                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
205                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
206                 writeVariableLengthInt(buf, variablePartSize);
207 
208                 buf.writeShort(protocolNameBytes.length);
209                 buf.writeBytes(protocolNameBytes);
210 
211                 buf.writeByte(variableHeader.version());
212                 buf.writeByte(getConnVariableHeaderFlag(variableHeader));
213                 buf.writeShort(variableHeader.keepAliveTimeSeconds());
214                 buf.writeBytes(propertiesBuf);
215 
216                 // Payload
217                 writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
218                 if (variableHeader.isWillFlag()) {
219                     buf.writeBytes(willPropertiesBuf);
220                     writeExactUTF8String(buf, willTopic, willTopicBytes);
221                     buf.writeShort(willMessageBytes.length);
222                     buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
223                 }
224                 if (variableHeader.hasUserName()) {
225                     writeExactUTF8String(buf, userName, userNameBytes);
226                 }
227                 if (variableHeader.hasPassword()) {
228                     buf.writeShort(passwordBytes.length);
229                     buf.writeBytes(passwordBytes, 0, passwordBytes.length);
230                 }
231                 return buf;
232             } finally {
233                 willPropertiesBuf.release();
234             }
235         } finally {
236             propertiesBuf.release();
237         }
238     }
239 
240     private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
241         int flagByte = 0;
242         if (variableHeader.hasUserName()) {
243             flagByte |= 0x80;
244         }
245         if (variableHeader.hasPassword()) {
246             flagByte |= 0x40;
247         }
248         if (variableHeader.isWillRetain()) {
249             flagByte |= 0x20;
250         }
251         flagByte |= (variableHeader.willQos() & 0x03) << 3;
252         if (variableHeader.isWillFlag()) {
253             flagByte |= 0x04;
254         }
255         if (variableHeader.isCleanSession()) {
256             flagByte |= 0x02;
257         }
258         return flagByte;
259     }
260 
261     private static ByteBuf encodeConnAckMessage(
262             ChannelHandlerContext ctx,
263             MqttConnAckMessage message) {
264         final MqttVersion mqttVersion = getMqttVersion(ctx);
265         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
266                 ctx.alloc(),
267                 message.variableHeader().properties());
268 
269         try {
270             ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
271             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
272             writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
273             buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
274             buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
275             buf.writeBytes(propertiesBuf);
276             return buf;
277         } finally {
278             propertiesBuf.release();
279         }
280     }
281 
282     private static ByteBuf encodeSubscribeMessage(
283             ChannelHandlerContext ctx,
284             MqttSubscribeMessage message) {
285         MqttVersion mqttVersion = getMqttVersion(ctx);
286         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
287                 ctx.alloc(),
288                 message.idAndPropertiesVariableHeader().properties());
289 
290         try {
291             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
292             int payloadBufferSize = 0;
293 
294             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
295             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
296             MqttSubscribePayload payload = message.payload();
297 
298             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
299                 String topicName = topic.topicName();
300                 int topicNameBytes = utf8Bytes(topicName);
301                 payloadBufferSize += 2 + topicNameBytes;
302                 payloadBufferSize += 1;
303             }
304 
305             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
306             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
307 
308             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
309             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
310             writeVariableLengthInt(buf, variablePartSize);
311 
312             // Variable Header
313             int messageId = variableHeader.messageId();
314             buf.writeShort(messageId);
315             buf.writeBytes(propertiesBuf);
316 
317             // Payload
318             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
319                 writeUnsafeUTF8String(buf, topic.topicName());
320                 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) {
321                     buf.writeByte(topic.qualityOfService().value());
322                 } else {
323                     final MqttSubscriptionOption option = topic.option();
324 
325                     int optionEncoded = option.retainHandling().value() << 4;
326                     if (option.isRetainAsPublished()) {
327                         optionEncoded |= 0x08;
328                     }
329                     if (option.isNoLocal()) {
330                         optionEncoded |= 0x04;
331                     }
332                     optionEncoded |= option.qos().value();
333 
334                     buf.writeByte(optionEncoded);
335                 }
336             }
337 
338             return buf;
339         } finally {
340             propertiesBuf.release();
341         }
342     }
343 
344     private static ByteBuf encodeUnsubscribeMessage(
345             ChannelHandlerContext ctx,
346             MqttUnsubscribeMessage message) {
347         MqttVersion mqttVersion = getMqttVersion(ctx);
348         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
349                 ctx.alloc(),
350                 message.idAndPropertiesVariableHeader().properties());
351 
352         try {
353             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
354             int payloadBufferSize = 0;
355 
356             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
357             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
358             MqttUnsubscribePayload payload = message.payload();
359 
360             for (String topicName : payload.topics()) {
361                 int topicNameBytes = utf8Bytes(topicName);
362                 payloadBufferSize += 2 + topicNameBytes;
363             }
364 
365             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
366             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
367 
368             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
369             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
370             writeVariableLengthInt(buf, variablePartSize);
371 
372             // Variable Header
373             int messageId = variableHeader.messageId();
374             buf.writeShort(messageId);
375             buf.writeBytes(propertiesBuf);
376 
377             // Payload
378             for (String topicName : payload.topics()) {
379                 writeUnsafeUTF8String(buf, topicName);
380             }
381 
382             return buf;
383         } finally {
384             propertiesBuf.release();
385         }
386     }
387 
388     private static ByteBuf encodeSubAckMessage(
389             ChannelHandlerContext ctx,
390             MqttSubAckMessage message) {
391         MqttVersion mqttVersion = getMqttVersion(ctx);
392         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
393                 ctx.alloc(),
394                 message.idAndPropertiesVariableHeader().properties());
395         try {
396             int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
397             int payloadBufferSize = message.payload().grantedQoSLevels().size();
398             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
399             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
400             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
401             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
402             writeVariableLengthInt(buf, variablePartSize);
403             buf.writeShort(message.variableHeader().messageId());
404             buf.writeBytes(propertiesBuf);
405             for (int code: message.payload().reasonCodes()) {
406                 buf.writeByte(code);
407             }
408 
409             return buf;
410         } finally {
411             propertiesBuf.release();
412         }
413     }
414 
415     private static ByteBuf encodeUnsubAckMessage(
416             ChannelHandlerContext ctx,
417             MqttUnsubAckMessage message) {
418         if (message.variableHeader() instanceof  MqttMessageIdAndPropertiesVariableHeader) {
419             MqttVersion mqttVersion = getMqttVersion(ctx);
420             ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
421                     ctx.alloc(),
422                     message.idAndPropertiesVariableHeader().properties());
423             try {
424                 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
425                 MqttUnsubAckPayload payload = message.payload();
426                 int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
427                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
428                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
429                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
430                 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
431                 writeVariableLengthInt(buf, variablePartSize);
432                 buf.writeShort(message.variableHeader().messageId());
433                 buf.writeBytes(propertiesBuf);
434 
435                 if (payload != null) {
436                     for (Short reasonCode : payload.unsubscribeReasonCodes()) {
437                         buf.writeByte(reasonCode);
438                     }
439                 }
440 
441                 return buf;
442             } finally {
443                 propertiesBuf.release();
444             }
445         } else {
446             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
447         }
448     }
449 
450     private static ByteBuf encodePublishMessage(
451             ChannelHandlerContext ctx,
452             MqttPublishMessage message) {
453         MqttVersion mqttVersion = getMqttVersion(ctx);
454         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
455         MqttPublishVariableHeader variableHeader = message.variableHeader();
456         ByteBuf payload = message.payload().duplicate();
457 
458         String topicName = variableHeader.topicName();
459         int topicNameBytes = utf8Bytes(topicName);
460 
461         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
462                 ctx.alloc(),
463                 message.variableHeader().properties());
464 
465         try {
466             boolean qosLevelGreaterZero = mqttFixedHeader.qosLevel().value() > 0;
467             int variableHeaderBufferSize = 2 + topicNameBytes +
468                     (qosLevelGreaterZero ? 2 : 0) + propertiesBuf.readableBytes();
469             int payloadBufferSize = payload.readableBytes();
470             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
471             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
472 
473             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
474             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
475             writeVariableLengthInt(buf, variablePartSize);
476             writeExactUTF8String(buf, topicName, topicNameBytes);
477             if (qosLevelGreaterZero) {
478                 buf.writeShort(variableHeader.packetId());
479             }
480             buf.writeBytes(propertiesBuf);
481             buf.writeBytes(payload);
482 
483             return buf;
484         } finally {
485             propertiesBuf.release();
486         }
487     }
488 
489     private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
490                                           MqttMessage message) {
491         if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
492             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
493             MqttPubReplyMessageVariableHeader variableHeader =
494                     (MqttPubReplyMessageVariableHeader) message.variableHeader();
495             int msgId = variableHeader.messageId();
496 
497             final ByteBuf propertiesBuf;
498             final boolean includeReasonCode;
499             final int variableHeaderBufferSize;
500             final MqttVersion mqttVersion = getMqttVersion(ctx);
501             if (mqttVersion == MqttVersion.MQTT_5 &&
502                     (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
503                             !variableHeader.properties().isEmpty())) {
504                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
505                 includeReasonCode = true;
506                 variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
507             } else {
508                 propertiesBuf = Unpooled.EMPTY_BUFFER;
509                 includeReasonCode = false;
510                 variableHeaderBufferSize = 2;
511             }
512 
513             try {
514                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
515                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
516                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
517                 writeVariableLengthInt(buf, variableHeaderBufferSize);
518                 buf.writeShort(msgId);
519                 if (includeReasonCode) {
520                     buf.writeByte(variableHeader.reasonCode());
521                 }
522                 buf.writeBytes(propertiesBuf);
523 
524                 return buf;
525             } finally {
526                 propertiesBuf.release();
527             }
528         } else {
529             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
530         }
531     }
532 
533     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
534             ByteBufAllocator byteBufAllocator,
535             MqttMessage message) {
536         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
537         MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
538         int msgId = variableHeader.messageId();
539 
540         int variableHeaderBufferSize = 2; // variable part only has a message id
541         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
542         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
543         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
544         writeVariableLengthInt(buf, variableHeaderBufferSize);
545         buf.writeShort(msgId);
546 
547         return buf;
548     }
549 
550     private static ByteBuf encodeReasonCodePlusPropertiesMessage(
551             ChannelHandlerContext ctx,
552             MqttMessage message) {
553         if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
554             MqttVersion mqttVersion = getMqttVersion(ctx);
555             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
556             MqttReasonCodeAndPropertiesVariableHeader variableHeader =
557                     (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
558 
559             final ByteBuf propertiesBuf;
560             final boolean includeReasonCode;
561             final int variableHeaderBufferSize;
562             if (mqttVersion == MqttVersion.MQTT_5 &&
563                     (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
564                             !variableHeader.properties().isEmpty())) {
565                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
566                 includeReasonCode = true;
567                 variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
568             } else {
569                 propertiesBuf = Unpooled.EMPTY_BUFFER;
570                 includeReasonCode = false;
571                 variableHeaderBufferSize = 0;
572             }
573 
574             try {
575                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
576                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
577                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
578                 writeVariableLengthInt(buf, variableHeaderBufferSize);
579                 if (includeReasonCode) {
580                     buf.writeByte(variableHeader.reasonCode());
581                 }
582                 buf.writeBytes(propertiesBuf);
583 
584                 return buf;
585             } finally {
586                 propertiesBuf.release();
587             }
588         } else {
589             return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
590         }
591     }
592 
593     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
594             ByteBufAllocator byteBufAllocator,
595             MqttMessage message) {
596         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
597         ByteBuf buf = byteBufAllocator.buffer(2);
598         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
599         buf.writeByte(0);
600 
601         return buf;
602     }
603 
604     private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
605                                              ByteBufAllocator byteBufAllocator,
606                                              MqttProperties mqttProperties) {
607         if (mqttVersion == MqttVersion.MQTT_5) {
608             return encodeProperties(byteBufAllocator, mqttProperties);
609         }
610         return Unpooled.EMPTY_BUFFER;
611     }
612 
613     private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
614                                             MqttProperties mqttProperties) {
615         ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
616         // encode also the Properties part
617         try {
618             ByteBuf propertiesBuf = byteBufAllocator.buffer();
619             try {
620                 for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
621                     int propertyId = property.propertyId;
622                     switch (propertyId) {
623                         case PAYLOAD_FORMAT_INDICATOR:
624                         case REQUEST_PROBLEM_INFORMATION:
625                         case REQUEST_RESPONSE_INFORMATION:
626                         case MAXIMUM_QOS:
627                         case RETAIN_AVAILABLE:
628                         case WILDCARD_SUBSCRIPTION_AVAILABLE:
629                         case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
630                         case SHARED_SUBSCRIPTION_AVAILABLE:
631                             writeVariableLengthInt(propertiesBuf, propertyId);
632                             final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
633                             propertiesBuf.writeByte(bytePropValue);
634                             break;
635                         case SERVER_KEEP_ALIVE:
636                         case RECEIVE_MAXIMUM:
637                         case TOPIC_ALIAS_MAXIMUM:
638                         case TOPIC_ALIAS:
639                             writeVariableLengthInt(propertiesBuf, propertyId);
640                             final short twoBytesInPropValue =
641                                     ((MqttProperties.IntegerProperty) property).value.shortValue();
642                             propertiesBuf.writeShort(twoBytesInPropValue);
643                             break;
644                         case PUBLICATION_EXPIRY_INTERVAL:
645                         case SESSION_EXPIRY_INTERVAL:
646                         case WILL_DELAY_INTERVAL:
647                         case MAXIMUM_PACKET_SIZE:
648                             writeVariableLengthInt(propertiesBuf, propertyId);
649                             final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
650                             propertiesBuf.writeInt(fourBytesIntPropValue);
651                             break;
652                         case SUBSCRIPTION_IDENTIFIER:
653                             writeVariableLengthInt(propertiesBuf, propertyId);
654                             final int vbi = ((MqttProperties.IntegerProperty) property).value;
655                             writeVariableLengthInt(propertiesBuf, vbi);
656                             break;
657                         case CONTENT_TYPE:
658                         case RESPONSE_TOPIC:
659                         case ASSIGNED_CLIENT_IDENTIFIER:
660                         case AUTHENTICATION_METHOD:
661                         case RESPONSE_INFORMATION:
662                         case SERVER_REFERENCE:
663                         case REASON_STRING:
664                             writeVariableLengthInt(propertiesBuf, propertyId);
665                             writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
666                             break;
667                         case USER_PROPERTY:
668                             final List<MqttProperties.StringPair> pairs =
669                                     ((MqttProperties.UserProperties) property).value;
670                             for (MqttProperties.StringPair pair : pairs) {
671                                 writeVariableLengthInt(propertiesBuf, propertyId);
672                                 writeEagerUTF8String(propertiesBuf, pair.key);
673                                 writeEagerUTF8String(propertiesBuf, pair.value);
674                             }
675                             break;
676                         case CORRELATION_DATA:
677                         case AUTHENTICATION_DATA:
678                             writeVariableLengthInt(propertiesBuf, propertyId);
679                             final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
680                             propertiesBuf.writeShort(binaryPropValue.length);
681                             propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
682                             break;
683                         default:
684                             //shouldn't reach here
685                             throw new EncoderException("Unknown property type: " + propertyId);
686                     }
687                 }
688                 writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
689                 propertiesHeaderBuf.writeBytes(propertiesBuf);
690 
691                 return propertiesHeaderBuf;
692             } finally {
693                 propertiesBuf.release();
694             }
695         } catch (RuntimeException e) {
696             propertiesHeaderBuf.release();
697             throw e;
698         }
699     }
700 
701     private static int getFixedHeaderByte1(MqttFixedHeader header) {
702         int ret = 0;
703         ret |= header.messageType().value() << 4;
704         if (header.isDup()) {
705             ret |= 0x08;
706         }
707         ret |= header.qosLevel().value() << 1;
708         if (header.isRetain()) {
709             ret |= 0x01;
710         }
711         return ret;
712     }
713 
714     private static void writeVariableLengthInt(ByteBuf buf, int num) {
715         do {
716             int digit = num & 0x7F;
717             num >>>= 7;
718             if (num > 0) {
719                 digit |= 0x80;
720             }
721             buf.writeByte(digit);
722         } while (num > 0);
723     }
724 
725     private static int nullableUtf8Bytes(String s) {
726         return s == null? 0 : utf8Bytes(s);
727     }
728 
729     private static int nullableMaxUtf8Bytes(String s) {
730         return s == null? 0 : utf8MaxBytes(s);
731     }
732 
733     private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
734         buf.ensureWritable(utf8Length + 2);
735         buf.writeShort(utf8Length);
736         if (utf8Length > 0) {
737             final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
738             assert writtenUtf8Length == utf8Length;
739         }
740     }
741 
742     private static void writeEagerUTF8String(ByteBuf buf, String s) {
743         final int maxUtf8Length = nullableMaxUtf8Bytes(s);
744         buf.ensureWritable(maxUtf8Length + 2);
745         final int writerIndex = buf.writerIndex();
746         final int startUtf8String = writerIndex + 2;
747         buf.writerIndex(startUtf8String);
748         final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
749         buf.setShort(writerIndex, utf8Length);
750     }
751 
752     private static void writeUnsafeUTF8String(ByteBuf buf, String s) {
753         final int writerIndex = buf.writerIndex();
754         final int startUtf8String = writerIndex + 2;
755         // no need to reserve any capacity here, already done earlier: that's why is Unsafe
756         buf.writerIndex(startUtf8String);
757         final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, 0) : 0;
758         buf.setShort(writerIndex, utf8Length);
759     }
760 
761     private static int getVariableLengthInt(int num) {
762         if (num < 128) {
763             return 1;
764         }
765         if (num < 16_384) { // 128 * 128
766             return 2;
767         }
768         if (num < 2_097_152) { // 128 * 128 * 128
769             return 3;
770         }
771         return 4;
772     }
773 
774 }