View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.handler.codec.EncoderException;
25  import io.netty.handler.codec.MessageToMessageEncoder;
26  import io.netty.util.internal.EmptyArrays;
27  
28  import java.util.List;
29  
30  import static io.netty.buffer.ByteBufUtil.*;
31  import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
32  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
34  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
35  
36  /**
37   * Encodes Mqtt messages into bytes following the protocol specification v3.1
38   * as described here <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
39   * or v5.0 as described here <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">MQTTv5.0</a> -
40   * depending on the version specified in the first CONNECT message that goes through the channel.
41   */
42  @ChannelHandler.Sharable
43  public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
44  
45      public static final MqttEncoder INSTANCE = new MqttEncoder();
46  
47      private MqttEncoder() { }
48  
49      @Override
50      protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
51          out.add(doEncode(ctx, msg));
52      }
53  
54      /**
55       * This is the main encoding method.
56       * It's only visible for testing.
57       *
58       * @param message MQTT message to encode
59       * @return ByteBuf with encoded bytes
60       */
61      static ByteBuf doEncode(ChannelHandlerContext ctx,
62                       MqttMessage message) {
63  
64          switch (message.fixedHeader().messageType()) {
65              case CONNECT:
66                  return encodeConnectMessage(ctx, (MqttConnectMessage) message);
67  
68              case CONNACK:
69                  return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
70  
71              case PUBLISH:
72                  return encodePublishMessage(ctx, (MqttPublishMessage) message);
73  
74              case SUBSCRIBE:
75                  return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
76  
77              case UNSUBSCRIBE:
78                  return encodeUnsubscribeMessage(ctx,  (MqttUnsubscribeMessage) message);
79  
80              case SUBACK:
81                  return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
82  
83              case UNSUBACK:
84                  if (message instanceof MqttUnsubAckMessage) {
85                      return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
86                  }
87                  return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
88  
89              case PUBACK:
90              case PUBREC:
91              case PUBREL:
92              case PUBCOMP:
93                  return encodePubReplyMessage(ctx, message);
94  
95              case DISCONNECT:
96              case AUTH:
97                  return encodeReasonCodePlusPropertiesMessage(ctx, message);
98  
99              case PINGREQ:
100             case PINGRESP:
101                 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
102 
103             default:
104                 throw new IllegalArgumentException(
105                         "Unknown message type: " + message.fixedHeader().messageType().value());
106         }
107     }
108 
109     private static ByteBuf encodeConnectMessage(
110             ChannelHandlerContext ctx,
111             MqttConnectMessage message) {
112         int payloadBufferSize = 0;
113 
114         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
115         MqttConnectVariableHeader variableHeader = message.variableHeader();
116         MqttConnectPayload payload = message.payload();
117         MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
118                 (byte) variableHeader.version());
119         setMqttVersion(ctx, mqttVersion);
120 
121         // as MQTT 3.1 & 3.1.1 spec, If the User Name Flag is set to 0, the Password Flag MUST be set to 0
122         if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
123             throw new EncoderException("Without a username, the password MUST be not set");
124         }
125 
126         // Client id
127         String clientIdentifier = payload.clientIdentifier();
128         if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
129             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
130         }
131         int clientIdentifierBytes = utf8Bytes(clientIdentifier);
132         payloadBufferSize += 2 + clientIdentifierBytes;
133 
134         // Will topic and message
135         String willTopic = payload.willTopic();
136         int willTopicBytes = nullableUtf8Bytes(willTopic);
137         byte[] willMessage = payload.willMessageInBytes();
138         byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
139         if (variableHeader.isWillFlag()) {
140             payloadBufferSize += 2 + willTopicBytes;
141             payloadBufferSize += 2 + willMessageBytes.length;
142         }
143 
144         String userName = payload.userName();
145         int userNameBytes = nullableUtf8Bytes(userName);
146         if (variableHeader.hasUserName()) {
147             payloadBufferSize += 2 + userNameBytes;
148         }
149 
150         byte[] password = payload.passwordInBytes();
151         byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
152         if (variableHeader.hasPassword()) {
153             payloadBufferSize += 2 + passwordBytes.length;
154         }
155 
156         // Fixed and variable header
157         byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
158         ByteBuf propertiesBuf = encodePropertiesIfNeeded(
159                 mqttVersion,
160                 ctx.alloc(),
161                 message.variableHeader().properties());
162         try {
163             final ByteBuf willPropertiesBuf;
164             if (variableHeader.isWillFlag()) {
165                 willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
166                 payloadBufferSize += willPropertiesBuf.readableBytes();
167             } else {
168                 willPropertiesBuf = Unpooled.EMPTY_BUFFER;
169             }
170             try {
171                 int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
172 
173                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
174                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
175                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
176                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
177                 writeVariableLengthInt(buf, variablePartSize);
178 
179                 buf.writeShort(protocolNameBytes.length);
180                 buf.writeBytes(protocolNameBytes);
181 
182                 buf.writeByte(variableHeader.version());
183                 buf.writeByte(getConnVariableHeaderFlag(variableHeader));
184                 buf.writeShort(variableHeader.keepAliveTimeSeconds());
185                 buf.writeBytes(propertiesBuf);
186 
187                 // Payload
188                 writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
189                 if (variableHeader.isWillFlag()) {
190                     buf.writeBytes(willPropertiesBuf);
191                     writeExactUTF8String(buf, willTopic, willTopicBytes);
192                     buf.writeShort(willMessageBytes.length);
193                     buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
194                 }
195                 if (variableHeader.hasUserName()) {
196                     writeExactUTF8String(buf, userName, userNameBytes);
197                 }
198                 if (variableHeader.hasPassword()) {
199                     buf.writeShort(passwordBytes.length);
200                     buf.writeBytes(passwordBytes, 0, passwordBytes.length);
201                 }
202                 return buf;
203             } finally {
204                 willPropertiesBuf.release();
205             }
206         } finally {
207             propertiesBuf.release();
208         }
209     }
210 
211     private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
212         int flagByte = 0;
213         if (variableHeader.hasUserName()) {
214             flagByte |= 0x80;
215         }
216         if (variableHeader.hasPassword()) {
217             flagByte |= 0x40;
218         }
219         if (variableHeader.isWillRetain()) {
220             flagByte |= 0x20;
221         }
222         flagByte |= (variableHeader.willQos() & 0x03) << 3;
223         if (variableHeader.isWillFlag()) {
224             flagByte |= 0x04;
225         }
226         if (variableHeader.isCleanSession()) {
227             flagByte |= 0x02;
228         }
229         return flagByte;
230     }
231 
232     private static ByteBuf encodeConnAckMessage(
233             ChannelHandlerContext ctx,
234             MqttConnAckMessage message) {
235         final MqttVersion mqttVersion = getMqttVersion(ctx);
236         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
237                 ctx.alloc(),
238                 message.variableHeader().properties());
239 
240         try {
241             ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
242             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
243             writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
244             buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
245             buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
246             buf.writeBytes(propertiesBuf);
247             return buf;
248         } finally {
249             propertiesBuf.release();
250         }
251     }
252 
253     private static ByteBuf encodeSubscribeMessage(
254             ChannelHandlerContext ctx,
255             MqttSubscribeMessage message) {
256         MqttVersion mqttVersion = getMqttVersion(ctx);
257         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
258                 ctx.alloc(),
259                 message.idAndPropertiesVariableHeader().properties());
260 
261         try {
262             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
263             int payloadBufferSize = 0;
264 
265             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
266             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
267             MqttSubscribePayload payload = message.payload();
268 
269             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
270                 String topicName = topic.topicName();
271                 int topicNameBytes = utf8Bytes(topicName);
272                 payloadBufferSize += 2 + topicNameBytes;
273                 payloadBufferSize += 1;
274             }
275 
276             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
277             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
278 
279             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
280             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
281             writeVariableLengthInt(buf, variablePartSize);
282 
283             // Variable Header
284             int messageId = variableHeader.messageId();
285             buf.writeShort(messageId);
286             buf.writeBytes(propertiesBuf);
287 
288             // Payload
289             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
290                 writeUnsafeUTF8String(buf, topic.topicName());
291                 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) {
292                     buf.writeByte(topic.qualityOfService().value());
293                 } else {
294                     final MqttSubscriptionOption option = topic.option();
295 
296                     int optionEncoded = option.retainHandling().value() << 4;
297                     if (option.isRetainAsPublished()) {
298                         optionEncoded |= 0x08;
299                     }
300                     if (option.isNoLocal()) {
301                         optionEncoded |= 0x04;
302                     }
303                     optionEncoded |= option.qos().value();
304 
305                     buf.writeByte(optionEncoded);
306                 }
307             }
308 
309             return buf;
310         } finally {
311             propertiesBuf.release();
312         }
313     }
314 
315     private static ByteBuf encodeUnsubscribeMessage(
316             ChannelHandlerContext ctx,
317             MqttUnsubscribeMessage message) {
318         MqttVersion mqttVersion = getMqttVersion(ctx);
319         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
320                 ctx.alloc(),
321                 message.idAndPropertiesVariableHeader().properties());
322 
323         try {
324             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
325             int payloadBufferSize = 0;
326 
327             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
328             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
329             MqttUnsubscribePayload payload = message.payload();
330 
331             for (String topicName : payload.topics()) {
332                 int topicNameBytes = utf8Bytes(topicName);
333                 payloadBufferSize += 2 + topicNameBytes;
334             }
335 
336             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
337             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
338 
339             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
340             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
341             writeVariableLengthInt(buf, variablePartSize);
342 
343             // Variable Header
344             int messageId = variableHeader.messageId();
345             buf.writeShort(messageId);
346             buf.writeBytes(propertiesBuf);
347 
348             // Payload
349             for (String topicName : payload.topics()) {
350                 writeUnsafeUTF8String(buf, topicName);
351             }
352 
353             return buf;
354         } finally {
355             propertiesBuf.release();
356         }
357     }
358 
359     private static ByteBuf encodeSubAckMessage(
360             ChannelHandlerContext ctx,
361             MqttSubAckMessage message) {
362         MqttVersion mqttVersion = getMqttVersion(ctx);
363         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
364                 ctx.alloc(),
365                 message.idAndPropertiesVariableHeader().properties());
366         try {
367             int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
368             int payloadBufferSize = message.payload().grantedQoSLevels().size();
369             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
370             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
371             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
372             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
373             writeVariableLengthInt(buf, variablePartSize);
374             buf.writeShort(message.variableHeader().messageId());
375             buf.writeBytes(propertiesBuf);
376             for (int code: message.payload().reasonCodes()) {
377                 buf.writeByte(code);
378             }
379 
380             return buf;
381         } finally {
382             propertiesBuf.release();
383         }
384     }
385 
386     private static ByteBuf encodeUnsubAckMessage(
387             ChannelHandlerContext ctx,
388             MqttUnsubAckMessage message) {
389         if (message.variableHeader() instanceof  MqttMessageIdAndPropertiesVariableHeader) {
390             MqttVersion mqttVersion = getMqttVersion(ctx);
391             ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
392                     ctx.alloc(),
393                     message.idAndPropertiesVariableHeader().properties());
394             try {
395                 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
396                 MqttUnsubAckPayload payload = message.payload();
397                 int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
398                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
399                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
400                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
401                 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
402                 writeVariableLengthInt(buf, variablePartSize);
403                 buf.writeShort(message.variableHeader().messageId());
404                 buf.writeBytes(propertiesBuf);
405 
406                 if (payload != null) {
407                     for (Short reasonCode : payload.unsubscribeReasonCodes()) {
408                         buf.writeByte(reasonCode);
409                     }
410                 }
411 
412                 return buf;
413             } finally {
414                 propertiesBuf.release();
415             }
416         } else {
417             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
418         }
419     }
420 
421     private static ByteBuf encodePublishMessage(
422             ChannelHandlerContext ctx,
423             MqttPublishMessage message) {
424         MqttVersion mqttVersion = getMqttVersion(ctx);
425         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
426         MqttPublishVariableHeader variableHeader = message.variableHeader();
427         ByteBuf payload = message.payload().duplicate();
428 
429         String topicName = variableHeader.topicName();
430         int topicNameBytes = utf8Bytes(topicName);
431 
432         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
433                 ctx.alloc(),
434                 message.variableHeader().properties());
435 
436         try {
437             boolean qosLevelGreaterZero = mqttFixedHeader.qosLevel().value() > 0;
438             int variableHeaderBufferSize = 2 + topicNameBytes +
439                     (qosLevelGreaterZero ? 2 : 0) + propertiesBuf.readableBytes();
440             int payloadBufferSize = payload.readableBytes();
441             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
442             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
443 
444             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
445             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
446             writeVariableLengthInt(buf, variablePartSize);
447             writeExactUTF8String(buf, topicName, topicNameBytes);
448             if (qosLevelGreaterZero) {
449                 buf.writeShort(variableHeader.packetId());
450             }
451             buf.writeBytes(propertiesBuf);
452             buf.writeBytes(payload);
453 
454             return buf;
455         } finally {
456             propertiesBuf.release();
457         }
458     }
459 
460     private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
461                                           MqttMessage message) {
462         if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
463             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
464             MqttPubReplyMessageVariableHeader variableHeader =
465                     (MqttPubReplyMessageVariableHeader) message.variableHeader();
466             int msgId = variableHeader.messageId();
467 
468             final ByteBuf propertiesBuf;
469             final boolean includeReasonCode;
470             final int variableHeaderBufferSize;
471             final MqttVersion mqttVersion = getMqttVersion(ctx);
472             if (mqttVersion == MqttVersion.MQTT_5 &&
473                     (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
474                             !variableHeader.properties().isEmpty())) {
475                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
476                 includeReasonCode = true;
477                 variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
478             } else {
479                 propertiesBuf = Unpooled.EMPTY_BUFFER;
480                 includeReasonCode = false;
481                 variableHeaderBufferSize = 2;
482             }
483 
484             try {
485                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
486                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
487                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
488                 writeVariableLengthInt(buf, variableHeaderBufferSize);
489                 buf.writeShort(msgId);
490                 if (includeReasonCode) {
491                     buf.writeByte(variableHeader.reasonCode());
492                 }
493                 buf.writeBytes(propertiesBuf);
494 
495                 return buf;
496             } finally {
497                 propertiesBuf.release();
498             }
499         } else {
500             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
501         }
502     }
503 
504     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
505             ByteBufAllocator byteBufAllocator,
506             MqttMessage message) {
507         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
508         MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
509         int msgId = variableHeader.messageId();
510 
511         int variableHeaderBufferSize = 2; // variable part only has a message id
512         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
513         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
514         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
515         writeVariableLengthInt(buf, variableHeaderBufferSize);
516         buf.writeShort(msgId);
517 
518         return buf;
519     }
520 
521     private static ByteBuf encodeReasonCodePlusPropertiesMessage(
522             ChannelHandlerContext ctx,
523             MqttMessage message) {
524         if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
525             MqttVersion mqttVersion = getMqttVersion(ctx);
526             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
527             MqttReasonCodeAndPropertiesVariableHeader variableHeader =
528                     (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
529 
530             final ByteBuf propertiesBuf;
531             final boolean includeReasonCode;
532             final int variableHeaderBufferSize;
533             if (mqttVersion == MqttVersion.MQTT_5 &&
534                     (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
535                             !variableHeader.properties().isEmpty())) {
536                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
537                 includeReasonCode = true;
538                 variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
539             } else {
540                 propertiesBuf = Unpooled.EMPTY_BUFFER;
541                 includeReasonCode = false;
542                 variableHeaderBufferSize = 0;
543             }
544 
545             try {
546                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
547                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
548                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
549                 writeVariableLengthInt(buf, variableHeaderBufferSize);
550                 if (includeReasonCode) {
551                     buf.writeByte(variableHeader.reasonCode());
552                 }
553                 buf.writeBytes(propertiesBuf);
554 
555                 return buf;
556             } finally {
557                 propertiesBuf.release();
558             }
559         } else {
560             return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
561         }
562     }
563 
564     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
565             ByteBufAllocator byteBufAllocator,
566             MqttMessage message) {
567         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
568         ByteBuf buf = byteBufAllocator.buffer(2);
569         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
570         buf.writeByte(0);
571 
572         return buf;
573     }
574 
575     private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
576                                              ByteBufAllocator byteBufAllocator,
577                                              MqttProperties mqttProperties) {
578         if (mqttVersion == MqttVersion.MQTT_5) {
579             return encodeProperties(byteBufAllocator, mqttProperties);
580         }
581         return Unpooled.EMPTY_BUFFER;
582     }
583 
584     private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
585                                             MqttProperties mqttProperties) {
586         ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
587         // encode also the Properties part
588         try {
589             ByteBuf propertiesBuf = byteBufAllocator.buffer();
590             try {
591                 for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
592                     MqttProperties.MqttPropertyType propertyType =
593                             MqttProperties.MqttPropertyType.valueOf(property.propertyId);
594                     switch (propertyType) {
595                         case PAYLOAD_FORMAT_INDICATOR:
596                         case REQUEST_PROBLEM_INFORMATION:
597                         case REQUEST_RESPONSE_INFORMATION:
598                         case MAXIMUM_QOS:
599                         case RETAIN_AVAILABLE:
600                         case WILDCARD_SUBSCRIPTION_AVAILABLE:
601                         case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
602                         case SHARED_SUBSCRIPTION_AVAILABLE:
603                             writeVariableLengthInt(propertiesBuf, property.propertyId);
604                             final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
605                             propertiesBuf.writeByte(bytePropValue);
606                             break;
607                         case SERVER_KEEP_ALIVE:
608                         case RECEIVE_MAXIMUM:
609                         case TOPIC_ALIAS_MAXIMUM:
610                         case TOPIC_ALIAS:
611                             writeVariableLengthInt(propertiesBuf, property.propertyId);
612                             final short twoBytesInPropValue =
613                                     ((MqttProperties.IntegerProperty) property).value.shortValue();
614                             propertiesBuf.writeShort(twoBytesInPropValue);
615                             break;
616                         case PUBLICATION_EXPIRY_INTERVAL:
617                         case SESSION_EXPIRY_INTERVAL:
618                         case WILL_DELAY_INTERVAL:
619                         case MAXIMUM_PACKET_SIZE:
620                             writeVariableLengthInt(propertiesBuf, property.propertyId);
621                             final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
622                             propertiesBuf.writeInt(fourBytesIntPropValue);
623                             break;
624                         case SUBSCRIPTION_IDENTIFIER:
625                             writeVariableLengthInt(propertiesBuf, property.propertyId);
626                             final int vbi = ((MqttProperties.IntegerProperty) property).value;
627                             writeVariableLengthInt(propertiesBuf, vbi);
628                             break;
629                         case CONTENT_TYPE:
630                         case RESPONSE_TOPIC:
631                         case ASSIGNED_CLIENT_IDENTIFIER:
632                         case AUTHENTICATION_METHOD:
633                         case RESPONSE_INFORMATION:
634                         case SERVER_REFERENCE:
635                         case REASON_STRING:
636                             writeVariableLengthInt(propertiesBuf, property.propertyId);
637                             writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
638                             break;
639                         case USER_PROPERTY:
640                             final List<MqttProperties.StringPair> pairs =
641                                     ((MqttProperties.UserProperties) property).value;
642                             for (MqttProperties.StringPair pair : pairs) {
643                                 writeVariableLengthInt(propertiesBuf, property.propertyId);
644                                 writeEagerUTF8String(propertiesBuf, pair.key);
645                                 writeEagerUTF8String(propertiesBuf, pair.value);
646                             }
647                             break;
648                         case CORRELATION_DATA:
649                         case AUTHENTICATION_DATA:
650                             writeVariableLengthInt(propertiesBuf, property.propertyId);
651                             final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
652                             propertiesBuf.writeShort(binaryPropValue.length);
653                             propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
654                             break;
655                         default:
656                             //shouldn't reach here
657                             throw new EncoderException("Unknown property type: " + propertyType);
658                     }
659                 }
660                 writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
661                 propertiesHeaderBuf.writeBytes(propertiesBuf);
662 
663                 return propertiesHeaderBuf;
664             } finally {
665                 propertiesBuf.release();
666             }
667         } catch (RuntimeException e) {
668             propertiesHeaderBuf.release();
669             throw e;
670         }
671     }
672 
673     private static int getFixedHeaderByte1(MqttFixedHeader header) {
674         int ret = 0;
675         ret |= header.messageType().value() << 4;
676         if (header.isDup()) {
677             ret |= 0x08;
678         }
679         ret |= header.qosLevel().value() << 1;
680         if (header.isRetain()) {
681             ret |= 0x01;
682         }
683         return ret;
684     }
685 
686     private static void writeVariableLengthInt(ByteBuf buf, int num) {
687         do {
688             int digit = num % 128;
689             num /= 128;
690             if (num > 0) {
691                 digit |= 0x80;
692             }
693             buf.writeByte(digit);
694         } while (num > 0);
695     }
696 
697     private static int nullableUtf8Bytes(String s) {
698         return s == null? 0 : utf8Bytes(s);
699     }
700 
701     private static int nullableMaxUtf8Bytes(String s) {
702         return s == null? 0 : utf8MaxBytes(s);
703     }
704 
705     private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
706         buf.ensureWritable(utf8Length + 2);
707         buf.writeShort(utf8Length);
708         if (utf8Length > 0) {
709             final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
710             assert writtenUtf8Length == utf8Length;
711         }
712     }
713 
714     private static void writeEagerUTF8String(ByteBuf buf, String s) {
715         final int maxUtf8Length = nullableMaxUtf8Bytes(s);
716         buf.ensureWritable(maxUtf8Length + 2);
717         final int writerIndex = buf.writerIndex();
718         final int startUtf8String = writerIndex + 2;
719         buf.writerIndex(startUtf8String);
720         final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
721         buf.setShort(writerIndex, utf8Length);
722     }
723 
724     private static void writeUnsafeUTF8String(ByteBuf buf, String s) {
725         final int writerIndex = buf.writerIndex();
726         final int startUtf8String = writerIndex + 2;
727         // no need to reserve any capacity here, already done earlier: that's why is Unsafe
728         buf.writerIndex(startUtf8String);
729         final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, 0) : 0;
730         buf.setShort(writerIndex, utf8Length);
731     }
732 
733     private static int getVariableLengthInt(int num) {
734         if (num < 128) {
735             return 1;
736         }
737         if (num < 16_384) { // 128 * 128
738             return 2;
739         }
740         if (num < 2_097_152) { // 128 * 128 * 128
741             return 3;
742         }
743         return 4;
744     }
745 
746 }