View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.handler.codec.EncoderException;
25  import io.netty.handler.codec.MessageToMessageEncoder;
26  import io.netty.util.internal.EmptyArrays;
27  
28  import java.util.List;
29  
30  import static io.netty.buffer.ByteBufUtil.*;
31  import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
32  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
34  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
35  
36  /**
37   * Encodes Mqtt messages into bytes following the protocol specification v3.1
38   * as described here <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
39   * or v5.0 as described here <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">MQTTv5.0</a> -
40   * depending on the version specified in the first CONNECT message that goes through the channel.
41   */
42  @ChannelHandler.Sharable
43  public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
44  
45      public static final MqttEncoder INSTANCE = new MqttEncoder();
46  
47      private MqttEncoder() { }
48  
49      @Override
50      protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
51          out.add(doEncode(ctx, msg));
52      }
53  
54      /**
55       * This is the main encoding method.
56       * It's only visible for testing.
57       *
58       * @param message MQTT message to encode
59       * @return ByteBuf with encoded bytes
60       */
61      static ByteBuf doEncode(ChannelHandlerContext ctx,
62                       MqttMessage message) {
63  
64          switch (message.fixedHeader().messageType()) {
65              case CONNECT:
66                  return encodeConnectMessage(ctx, (MqttConnectMessage) message);
67  
68              case CONNACK:
69                  return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
70  
71              case PUBLISH:
72                  return encodePublishMessage(ctx, (MqttPublishMessage) message);
73  
74              case SUBSCRIBE:
75                  return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
76  
77              case UNSUBSCRIBE:
78                  return encodeUnsubscribeMessage(ctx,  (MqttUnsubscribeMessage) message);
79  
80              case SUBACK:
81                  return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
82  
83              case UNSUBACK:
84                  if (message instanceof MqttUnsubAckMessage) {
85                      return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
86                  }
87                  return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
88  
89              case PUBACK:
90              case PUBREC:
91              case PUBREL:
92              case PUBCOMP:
93                  return encodePubReplyMessage(ctx, message);
94  
95              case DISCONNECT:
96              case AUTH:
97                  return encodeReasonCodePlusPropertiesMessage(ctx, message);
98  
99              case PINGREQ:
100             case PINGRESP:
101                 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
102 
103             default:
104                 throw new IllegalArgumentException(
105                         "Unknown message type: " + message.fixedHeader().messageType().value());
106         }
107     }
108 
109     private static ByteBuf encodeConnectMessage(
110             ChannelHandlerContext ctx,
111             MqttConnectMessage message) {
112         int payloadBufferSize = 0;
113 
114         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
115         MqttConnectVariableHeader variableHeader = message.variableHeader();
116         MqttConnectPayload payload = message.payload();
117         MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
118                 (byte) variableHeader.version());
119         setMqttVersion(ctx, mqttVersion);
120 
121         // MQTT 3.1 and 3.1.1 require the Password Flag to be 0 when the User Name Flag is 0.
122         if ((mqttVersion == MqttVersion.MQTT_3_1 || mqttVersion == MqttVersion.MQTT_3_1_1) &&
123                 !variableHeader.hasUserName() && variableHeader.hasPassword()) {
124             throw new EncoderException("Without a username, the password MUST be not set");
125         }
126 
127         // Client id
128         String clientIdentifier = payload.clientIdentifier();
129         if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
130             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
131         }
132         int clientIdentifierBytes = utf8Bytes(clientIdentifier);
133         payloadBufferSize += 2 + clientIdentifierBytes;
134 
135         // Will topic and message
136         String willTopic = payload.willTopic();
137         int willTopicBytes = nullableUtf8Bytes(willTopic);
138         byte[] willMessage = payload.willMessageInBytes();
139         byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
140         if (variableHeader.isWillFlag()) {
141             payloadBufferSize += 2 + willTopicBytes;
142             payloadBufferSize += 2 + willMessageBytes.length;
143         }
144 
145         String userName = payload.userName();
146         int userNameBytes = nullableUtf8Bytes(userName);
147         if (variableHeader.hasUserName()) {
148             payloadBufferSize += 2 + userNameBytes;
149         }
150 
151         byte[] password = payload.passwordInBytes();
152         byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
153         if (variableHeader.hasPassword()) {
154             payloadBufferSize += 2 + passwordBytes.length;
155         }
156 
157         // Fixed and variable header
158         byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
159         ByteBuf propertiesBuf = encodePropertiesIfNeeded(
160                 mqttVersion,
161                 ctx.alloc(),
162                 message.variableHeader().properties());
163         try {
164             final ByteBuf willPropertiesBuf;
165             if (variableHeader.isWillFlag()) {
166                 willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
167                 payloadBufferSize += willPropertiesBuf.readableBytes();
168             } else {
169                 willPropertiesBuf = Unpooled.EMPTY_BUFFER;
170             }
171             try {
172                 int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
173 
174                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
175                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
176                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
177                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
178                 writeVariableLengthInt(buf, variablePartSize);
179 
180                 buf.writeShort(protocolNameBytes.length);
181                 buf.writeBytes(protocolNameBytes);
182 
183                 buf.writeByte(variableHeader.version());
184                 buf.writeByte(getConnVariableHeaderFlag(variableHeader));
185                 buf.writeShort(variableHeader.keepAliveTimeSeconds());
186                 buf.writeBytes(propertiesBuf);
187 
188                 // Payload
189                 writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
190                 if (variableHeader.isWillFlag()) {
191                     buf.writeBytes(willPropertiesBuf);
192                     writeExactUTF8String(buf, willTopic, willTopicBytes);
193                     buf.writeShort(willMessageBytes.length);
194                     buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
195                 }
196                 if (variableHeader.hasUserName()) {
197                     writeExactUTF8String(buf, userName, userNameBytes);
198                 }
199                 if (variableHeader.hasPassword()) {
200                     buf.writeShort(passwordBytes.length);
201                     buf.writeBytes(passwordBytes, 0, passwordBytes.length);
202                 }
203                 return buf;
204             } finally {
205                 willPropertiesBuf.release();
206             }
207         } finally {
208             propertiesBuf.release();
209         }
210     }
211 
212     private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
213         int flagByte = 0;
214         if (variableHeader.hasUserName()) {
215             flagByte |= 0x80;
216         }
217         if (variableHeader.hasPassword()) {
218             flagByte |= 0x40;
219         }
220         if (variableHeader.isWillRetain()) {
221             flagByte |= 0x20;
222         }
223         flagByte |= (variableHeader.willQos() & 0x03) << 3;
224         if (variableHeader.isWillFlag()) {
225             flagByte |= 0x04;
226         }
227         if (variableHeader.isCleanSession()) {
228             flagByte |= 0x02;
229         }
230         return flagByte;
231     }
232 
233     private static ByteBuf encodeConnAckMessage(
234             ChannelHandlerContext ctx,
235             MqttConnAckMessage message) {
236         final MqttVersion mqttVersion = getMqttVersion(ctx);
237         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
238                 ctx.alloc(),
239                 message.variableHeader().properties());
240 
241         try {
242             ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
243             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
244             writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
245             buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
246             buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
247             buf.writeBytes(propertiesBuf);
248             return buf;
249         } finally {
250             propertiesBuf.release();
251         }
252     }
253 
254     private static ByteBuf encodeSubscribeMessage(
255             ChannelHandlerContext ctx,
256             MqttSubscribeMessage message) {
257         MqttVersion mqttVersion = getMqttVersion(ctx);
258         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
259                 ctx.alloc(),
260                 message.idAndPropertiesVariableHeader().properties());
261 
262         try {
263             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
264             int payloadBufferSize = 0;
265 
266             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
267             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
268             MqttSubscribePayload payload = message.payload();
269 
270             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
271                 String topicName = topic.topicName();
272                 int topicNameBytes = utf8Bytes(topicName);
273                 payloadBufferSize += 2 + topicNameBytes;
274                 payloadBufferSize += 1;
275             }
276 
277             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
278             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
279 
280             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
281             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
282             writeVariableLengthInt(buf, variablePartSize);
283 
284             // Variable Header
285             int messageId = variableHeader.messageId();
286             buf.writeShort(messageId);
287             buf.writeBytes(propertiesBuf);
288 
289             // Payload
290             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
291                 writeEagerUTF8String(buf, topic.topicName());
292                 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) {
293                     buf.writeByte(topic.qualityOfService().value());
294                 } else {
295                     final MqttSubscriptionOption option = topic.option();
296 
297                     int optionEncoded = option.retainHandling().value() << 4;
298                     if (option.isRetainAsPublished()) {
299                         optionEncoded |= 0x08;
300                     }
301                     if (option.isNoLocal()) {
302                         optionEncoded |= 0x04;
303                     }
304                     optionEncoded |= option.qos().value();
305 
306                     buf.writeByte(optionEncoded);
307                 }
308             }
309 
310             return buf;
311         } finally {
312             propertiesBuf.release();
313         }
314     }
315 
316     private static ByteBuf encodeUnsubscribeMessage(
317             ChannelHandlerContext ctx,
318             MqttUnsubscribeMessage message) {
319         MqttVersion mqttVersion = getMqttVersion(ctx);
320         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
321                 ctx.alloc(),
322                 message.idAndPropertiesVariableHeader().properties());
323 
324         try {
325             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
326             int payloadBufferSize = 0;
327 
328             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
329             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
330             MqttUnsubscribePayload payload = message.payload();
331 
332             for (String topicName : payload.topics()) {
333                 int topicNameBytes = utf8Bytes(topicName);
334                 payloadBufferSize += 2 + topicNameBytes;
335             }
336 
337             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
338             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
339 
340             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
341             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
342             writeVariableLengthInt(buf, variablePartSize);
343 
344             // Variable Header
345             int messageId = variableHeader.messageId();
346             buf.writeShort(messageId);
347             buf.writeBytes(propertiesBuf);
348 
349             // Payload
350             for (String topicName : payload.topics()) {
351                 writeEagerUTF8String(buf, topicName);
352             }
353 
354             return buf;
355         } finally {
356             propertiesBuf.release();
357         }
358     }
359 
360     private static ByteBuf encodeSubAckMessage(
361             ChannelHandlerContext ctx,
362             MqttSubAckMessage message) {
363         MqttVersion mqttVersion = getMqttVersion(ctx);
364         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
365                 ctx.alloc(),
366                 message.idAndPropertiesVariableHeader().properties());
367         try {
368             int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
369             int payloadBufferSize = message.payload().grantedQoSLevels().size();
370             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
371             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
372             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
373             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
374             writeVariableLengthInt(buf, variablePartSize);
375             buf.writeShort(message.variableHeader().messageId());
376             buf.writeBytes(propertiesBuf);
377             for (int code: message.payload().reasonCodes()) {
378                 buf.writeByte(code);
379             }
380 
381             return buf;
382         } finally {
383             propertiesBuf.release();
384         }
385     }
386 
387     private static ByteBuf encodeUnsubAckMessage(
388             ChannelHandlerContext ctx,
389             MqttUnsubAckMessage message) {
390         if (message.variableHeader() instanceof  MqttMessageIdAndPropertiesVariableHeader) {
391             MqttVersion mqttVersion = getMqttVersion(ctx);
392             ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
393                     ctx.alloc(),
394                     message.idAndPropertiesVariableHeader().properties());
395             try {
396                 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
397                 MqttUnsubAckPayload payload = message.payload();
398                 int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
399                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
400                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
401                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
402                 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
403                 writeVariableLengthInt(buf, variablePartSize);
404                 buf.writeShort(message.variableHeader().messageId());
405                 buf.writeBytes(propertiesBuf);
406 
407                 if (payload != null) {
408                     for (Short reasonCode : payload.unsubscribeReasonCodes()) {
409                         buf.writeByte(reasonCode);
410                     }
411                 }
412 
413                 return buf;
414             } finally {
415                 propertiesBuf.release();
416             }
417         } else {
418             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
419         }
420     }
421 
422     private static ByteBuf encodePublishMessage(
423             ChannelHandlerContext ctx,
424             MqttPublishMessage message) {
425         MqttVersion mqttVersion = getMqttVersion(ctx);
426         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
427         MqttPublishVariableHeader variableHeader = message.variableHeader();
428         ByteBuf payload = message.payload().duplicate();
429 
430         String topicName = variableHeader.topicName();
431         int topicNameBytes = utf8Bytes(topicName);
432 
433         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
434                 ctx.alloc(),
435                 message.variableHeader().properties());
436 
437         try {
438             int variableHeaderBufferSize = 2 + topicNameBytes +
439                     (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0) + propertiesBuf.readableBytes();
440             int payloadBufferSize = payload.readableBytes();
441             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
442             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
443 
444             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
445             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
446             writeVariableLengthInt(buf, variablePartSize);
447             writeExactUTF8String(buf, topicName, topicNameBytes);
448             if (mqttFixedHeader.qosLevel().value() > 0) {
449                 buf.writeShort(variableHeader.packetId());
450             }
451             buf.writeBytes(propertiesBuf);
452             buf.writeBytes(payload);
453 
454             return buf;
455         } finally {
456             propertiesBuf.release();
457         }
458     }
459 
460     private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
461                                           MqttMessage message) {
462         if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
463             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
464             MqttPubReplyMessageVariableHeader variableHeader =
465                     (MqttPubReplyMessageVariableHeader) message.variableHeader();
466             int msgId = variableHeader.messageId();
467 
468             final ByteBuf propertiesBuf;
469             final boolean includeReasonCode;
470             final int variableHeaderBufferSize;
471             final MqttVersion mqttVersion = getMqttVersion(ctx);
472             if (mqttVersion == MqttVersion.MQTT_5 &&
473                     (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
474                             !variableHeader.properties().isEmpty())) {
475                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
476                 includeReasonCode = true;
477                 variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
478             } else {
479                 propertiesBuf = Unpooled.EMPTY_BUFFER;
480                 includeReasonCode = false;
481                 variableHeaderBufferSize = 2;
482             }
483 
484             try {
485                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
486                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
487                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
488                 writeVariableLengthInt(buf, variableHeaderBufferSize);
489                 buf.writeShort(msgId);
490                 if (includeReasonCode) {
491                     buf.writeByte(variableHeader.reasonCode());
492                 }
493                 buf.writeBytes(propertiesBuf);
494 
495                 return buf;
496             } finally {
497                 propertiesBuf.release();
498             }
499         } else {
500             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
501         }
502     }
503 
504     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
505             ByteBufAllocator byteBufAllocator,
506             MqttMessage message) {
507         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
508         MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
509         int msgId = variableHeader.messageId();
510 
511         int variableHeaderBufferSize = 2; // variable part only has a message id
512         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
513         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
514         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
515         writeVariableLengthInt(buf, variableHeaderBufferSize);
516         buf.writeShort(msgId);
517 
518         return buf;
519     }
520 
521     private static ByteBuf encodeReasonCodePlusPropertiesMessage(
522             ChannelHandlerContext ctx,
523             MqttMessage message) {
524         if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
525             MqttVersion mqttVersion = getMqttVersion(ctx);
526             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
527             MqttReasonCodeAndPropertiesVariableHeader variableHeader =
528                     (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
529 
530             final ByteBuf propertiesBuf;
531             final boolean includeReasonCode;
532             final int variableHeaderBufferSize;
533             if (mqttVersion == MqttVersion.MQTT_5 &&
534                     (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
535                             !variableHeader.properties().isEmpty())) {
536                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
537                 includeReasonCode = true;
538                 variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
539             } else {
540                 propertiesBuf = Unpooled.EMPTY_BUFFER;
541                 includeReasonCode = false;
542                 variableHeaderBufferSize = 0;
543             }
544 
545             try {
546                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
547                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
548                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
549                 writeVariableLengthInt(buf, variableHeaderBufferSize);
550                 if (includeReasonCode) {
551                     buf.writeByte(variableHeader.reasonCode());
552                 }
553                 buf.writeBytes(propertiesBuf);
554 
555                 return buf;
556             } finally {
557                 propertiesBuf.release();
558             }
559         } else {
560             return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
561         }
562     }
563 
564     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
565             ByteBufAllocator byteBufAllocator,
566             MqttMessage message) {
567         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
568         ByteBuf buf = byteBufAllocator.buffer(2);
569         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
570         buf.writeByte(0);
571 
572         return buf;
573     }
574 
575     private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
576                                              ByteBufAllocator byteBufAllocator,
577                                              MqttProperties mqttProperties) {
578         if (mqttVersion == MqttVersion.MQTT_5) {
579             return encodeProperties(byteBufAllocator, mqttProperties);
580         }
581         return Unpooled.EMPTY_BUFFER;
582     }
583 
584     private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
585                                             MqttProperties mqttProperties) {
586         ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
587         // encode also the Properties part
588         try {
589             ByteBuf propertiesBuf = byteBufAllocator.buffer();
590             try {
591                 for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
592                     MqttProperties.MqttPropertyType propertyType =
593                             MqttProperties.MqttPropertyType.valueOf(property.propertyId);
594                     switch (propertyType) {
595                         case PAYLOAD_FORMAT_INDICATOR:
596                         case REQUEST_PROBLEM_INFORMATION:
597                         case REQUEST_RESPONSE_INFORMATION:
598                         case MAXIMUM_QOS:
599                         case RETAIN_AVAILABLE:
600                         case WILDCARD_SUBSCRIPTION_AVAILABLE:
601                         case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
602                         case SHARED_SUBSCRIPTION_AVAILABLE:
603                             writeVariableLengthInt(propertiesBuf, property.propertyId);
604                             final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
605                             propertiesBuf.writeByte(bytePropValue);
606                             break;
607                         case SERVER_KEEP_ALIVE:
608                         case RECEIVE_MAXIMUM:
609                         case TOPIC_ALIAS_MAXIMUM:
610                         case TOPIC_ALIAS:
611                             writeVariableLengthInt(propertiesBuf, property.propertyId);
612                             final short twoBytesInPropValue =
613                                     ((MqttProperties.IntegerProperty) property).value.shortValue();
614                             propertiesBuf.writeShort(twoBytesInPropValue);
615                             break;
616                         case PUBLICATION_EXPIRY_INTERVAL:
617                         case SESSION_EXPIRY_INTERVAL:
618                         case WILL_DELAY_INTERVAL:
619                         case MAXIMUM_PACKET_SIZE:
620                             writeVariableLengthInt(propertiesBuf, property.propertyId);
621                             final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
622                             propertiesBuf.writeInt(fourBytesIntPropValue);
623                             break;
624                         case SUBSCRIPTION_IDENTIFIER:
625                             writeVariableLengthInt(propertiesBuf, property.propertyId);
626                             final int vbi = ((MqttProperties.IntegerProperty) property).value;
627                             writeVariableLengthInt(propertiesBuf, vbi);
628                             break;
629                         case CONTENT_TYPE:
630                         case RESPONSE_TOPIC:
631                         case ASSIGNED_CLIENT_IDENTIFIER:
632                         case AUTHENTICATION_METHOD:
633                         case RESPONSE_INFORMATION:
634                         case SERVER_REFERENCE:
635                         case REASON_STRING:
636                             writeVariableLengthInt(propertiesBuf, property.propertyId);
637                             writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
638                             break;
639                         case USER_PROPERTY:
640                             final List<MqttProperties.StringPair> pairs =
641                                     ((MqttProperties.UserProperties) property).value;
642                             for (MqttProperties.StringPair pair : pairs) {
643                                 writeVariableLengthInt(propertiesBuf, property.propertyId);
644                                 writeEagerUTF8String(propertiesBuf, pair.key);
645                                 writeEagerUTF8String(propertiesBuf, pair.value);
646                             }
647                             break;
648                         case CORRELATION_DATA:
649                         case AUTHENTICATION_DATA:
650                             writeVariableLengthInt(propertiesBuf, property.propertyId);
651                             final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
652                             propertiesBuf.writeShort(binaryPropValue.length);
653                             propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
654                             break;
655                         default:
656                             //shouldn't reach here
657                             throw new EncoderException("Unknown property type: " + propertyType);
658                     }
659                 }
660                 writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
661                 propertiesHeaderBuf.writeBytes(propertiesBuf);
662 
663                 return propertiesHeaderBuf;
664             } finally {
665                 propertiesBuf.release();
666             }
667         } catch (RuntimeException e) {
668             propertiesHeaderBuf.release();
669             throw e;
670         }
671     }
672 
673     private static int getFixedHeaderByte1(MqttFixedHeader header) {
674         int ret = 0;
675         ret |= header.messageType().value() << 4;
676         if (header.isDup()) {
677             ret |= 0x08;
678         }
679         ret |= header.qosLevel().value() << 1;
680         if (header.isRetain()) {
681             ret |= 0x01;
682         }
683         return ret;
684     }
685 
686     private static void writeVariableLengthInt(ByteBuf buf, int num) {
687         do {
688             int digit = num % 128;
689             num /= 128;
690             if (num > 0) {
691                 digit |= 0x80;
692             }
693             buf.writeByte(digit);
694         } while (num > 0);
695     }
696 
697     private static int nullableUtf8Bytes(String s) {
698         return s == null? 0 : utf8Bytes(s);
699     }
700 
701     private static int nullableMaxUtf8Bytes(String s) {
702         return s == null? 0 : utf8MaxBytes(s);
703     }
704 
705     private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
706         buf.ensureWritable(utf8Length + 2);
707         buf.writeShort(utf8Length);
708         if (utf8Length > 0) {
709             final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
710             assert writtenUtf8Length == utf8Length;
711         }
712     }
713 
714     private static void writeEagerUTF8String(ByteBuf buf, String s) {
715         final int maxUtf8Length = nullableMaxUtf8Bytes(s);
716         buf.ensureWritable(maxUtf8Length + 2);
717         final int writerIndex = buf.writerIndex();
718         final int startUtf8String = writerIndex + 2;
719         buf.writerIndex(startUtf8String);
720         final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
721         buf.setShort(writerIndex, utf8Length);
722     }
723 
724     private static int getVariableLengthInt(int num) {
725         int count = 0;
726         do {
727             num /= 128;
728             count++;
729         } while (num > 0);
730         return count;
731     }
732 
733 }