View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.handler.codec.EncoderException;
25  import io.netty.handler.codec.MessageToMessageEncoder;
26  import io.netty.util.internal.EmptyArrays;
27  
28  import java.util.List;
29  
30  import static io.netty.buffer.ByteBufUtil.*;
31  import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
32  import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33  import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
34  import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
35  
36  /**
37   * Encodes Mqtt messages into bytes following the protocol specification v3.1
38   * as described here <a href="https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
39   * or v5.0 as described here <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">MQTTv5.0</a> -
40   * depending on the version specified in the first CONNECT message that goes through the channel.
41   */
42  @ChannelHandler.Sharable
43  public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
44  
45      public static final MqttEncoder INSTANCE = new MqttEncoder();
46  
47      private MqttEncoder() { }
48  
49      @Override
50      protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
51          out.add(doEncode(ctx, msg));
52      }
53  
54      /**
55       * This is the main encoding method.
56       * It's only visible for testing.
57       *
58       * @param message MQTT message to encode
59       * @return ByteBuf with encoded bytes
60       */
61      static ByteBuf doEncode(ChannelHandlerContext ctx,
62                       MqttMessage message) {
63  
64          switch (message.fixedHeader().messageType()) {
65              case CONNECT:
66                  return encodeConnectMessage(ctx, (MqttConnectMessage) message);
67  
68              case CONNACK:
69                  return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
70  
71              case PUBLISH:
72                  return encodePublishMessage(ctx, (MqttPublishMessage) message);
73  
74              case SUBSCRIBE:
75                  return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
76  
77              case UNSUBSCRIBE:
78                  return encodeUnsubscribeMessage(ctx,  (MqttUnsubscribeMessage) message);
79  
80              case SUBACK:
81                  return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
82  
83              case UNSUBACK:
84                  if (message instanceof MqttUnsubAckMessage) {
85                      return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
86                  }
87                  return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
88  
89              case PUBACK:
90              case PUBREC:
91              case PUBREL:
92              case PUBCOMP:
93                  return encodePubReplyMessage(ctx, message);
94  
95              case DISCONNECT:
96              case AUTH:
97                  return encodeReasonCodePlusPropertiesMessage(ctx, message);
98  
99              case PINGREQ:
100             case PINGRESP:
101                 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
102 
103             default:
104                 throw new IllegalArgumentException(
105                         "Unknown message type: " + message.fixedHeader().messageType().value());
106         }
107     }
108 
109     private static ByteBuf encodeConnectMessage(
110             ChannelHandlerContext ctx,
111             MqttConnectMessage message) {
112         int payloadBufferSize = 0;
113 
114         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
115         MqttConnectVariableHeader variableHeader = message.variableHeader();
116         MqttConnectPayload payload = message.payload();
117         MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
118                 (byte) variableHeader.version());
119         setMqttVersion(ctx, mqttVersion);
120 
121         // as MQTT 3.1 & 3.1.1 spec, If the User Name Flag is set to 0, the Password Flag MUST be set to 0
122         if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
123             throw new EncoderException("Without a username, the password MUST be not set");
124         }
125 
126         // Client id
127         String clientIdentifier = payload.clientIdentifier();
128         if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
129             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
130         }
131         int clientIdentifierBytes = utf8Bytes(clientIdentifier);
132         payloadBufferSize += 2 + clientIdentifierBytes;
133 
134         // Will topic and message
135         String willTopic = payload.willTopic();
136         int willTopicBytes = nullableUtf8Bytes(willTopic);
137         byte[] willMessage = payload.willMessageInBytes();
138         byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
139         if (variableHeader.isWillFlag()) {
140             payloadBufferSize += 2 + willTopicBytes;
141             payloadBufferSize += 2 + willMessageBytes.length;
142         }
143 
144         String userName = payload.userName();
145         int userNameBytes = nullableUtf8Bytes(userName);
146         if (variableHeader.hasUserName()) {
147             payloadBufferSize += 2 + userNameBytes;
148         }
149 
150         byte[] password = payload.passwordInBytes();
151         byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
152         if (variableHeader.hasPassword()) {
153             payloadBufferSize += 2 + passwordBytes.length;
154         }
155 
156         // Fixed and variable header
157         byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
158         ByteBuf propertiesBuf = encodePropertiesIfNeeded(
159                 mqttVersion,
160                 ctx.alloc(),
161                 message.variableHeader().properties());
162         try {
163             final ByteBuf willPropertiesBuf;
164             if (variableHeader.isWillFlag()) {
165                 willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
166                 payloadBufferSize += willPropertiesBuf.readableBytes();
167             } else {
168                 willPropertiesBuf = Unpooled.EMPTY_BUFFER;
169             }
170             try {
171                 int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
172 
173                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
174                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
175                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
176                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
177                 writeVariableLengthInt(buf, variablePartSize);
178 
179                 buf.writeShort(protocolNameBytes.length);
180                 buf.writeBytes(protocolNameBytes);
181 
182                 buf.writeByte(variableHeader.version());
183                 buf.writeByte(getConnVariableHeaderFlag(variableHeader));
184                 buf.writeShort(variableHeader.keepAliveTimeSeconds());
185                 buf.writeBytes(propertiesBuf);
186 
187                 // Payload
188                 writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
189                 if (variableHeader.isWillFlag()) {
190                     buf.writeBytes(willPropertiesBuf);
191                     writeExactUTF8String(buf, willTopic, willTopicBytes);
192                     buf.writeShort(willMessageBytes.length);
193                     buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
194                 }
195                 if (variableHeader.hasUserName()) {
196                     writeExactUTF8String(buf, userName, userNameBytes);
197                 }
198                 if (variableHeader.hasPassword()) {
199                     buf.writeShort(passwordBytes.length);
200                     buf.writeBytes(passwordBytes, 0, passwordBytes.length);
201                 }
202                 return buf;
203             } finally {
204                 willPropertiesBuf.release();
205             }
206         } finally {
207             propertiesBuf.release();
208         }
209     }
210 
211     private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
212         int flagByte = 0;
213         if (variableHeader.hasUserName()) {
214             flagByte |= 0x80;
215         }
216         if (variableHeader.hasPassword()) {
217             flagByte |= 0x40;
218         }
219         if (variableHeader.isWillRetain()) {
220             flagByte |= 0x20;
221         }
222         flagByte |= (variableHeader.willQos() & 0x03) << 3;
223         if (variableHeader.isWillFlag()) {
224             flagByte |= 0x04;
225         }
226         if (variableHeader.isCleanSession()) {
227             flagByte |= 0x02;
228         }
229         return flagByte;
230     }
231 
232     private static ByteBuf encodeConnAckMessage(
233             ChannelHandlerContext ctx,
234             MqttConnAckMessage message) {
235         final MqttVersion mqttVersion = getMqttVersion(ctx);
236         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
237                 ctx.alloc(),
238                 message.variableHeader().properties());
239 
240         try {
241             ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
242             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
243             writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
244             buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
245             buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
246             buf.writeBytes(propertiesBuf);
247             return buf;
248         } finally {
249             propertiesBuf.release();
250         }
251     }
252 
253     private static ByteBuf encodeSubscribeMessage(
254             ChannelHandlerContext ctx,
255             MqttSubscribeMessage message) {
256         MqttVersion mqttVersion = getMqttVersion(ctx);
257         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
258                 ctx.alloc(),
259                 message.idAndPropertiesVariableHeader().properties());
260 
261         try {
262             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
263             int payloadBufferSize = 0;
264 
265             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
266             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
267             MqttSubscribePayload payload = message.payload();
268 
269             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
270                 String topicName = topic.topicName();
271                 int topicNameBytes = utf8Bytes(topicName);
272                 payloadBufferSize += 2 + topicNameBytes;
273                 payloadBufferSize += 1;
274             }
275 
276             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
277             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
278 
279             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
280             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
281             writeVariableLengthInt(buf, variablePartSize);
282 
283             // Variable Header
284             int messageId = variableHeader.messageId();
285             buf.writeShort(messageId);
286             buf.writeBytes(propertiesBuf);
287 
288             // Payload
289             for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
290                 writeUnsafeUTF8String(buf, topic.topicName());
291                 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) {
292                     buf.writeByte(topic.qualityOfService().value());
293                 } else {
294                     final MqttSubscriptionOption option = topic.option();
295 
296                     int optionEncoded = option.retainHandling().value() << 4;
297                     if (option.isRetainAsPublished()) {
298                         optionEncoded |= 0x08;
299                     }
300                     if (option.isNoLocal()) {
301                         optionEncoded |= 0x04;
302                     }
303                     optionEncoded |= option.qos().value();
304 
305                     buf.writeByte(optionEncoded);
306                 }
307             }
308 
309             return buf;
310         } finally {
311             propertiesBuf.release();
312         }
313     }
314 
315     private static ByteBuf encodeUnsubscribeMessage(
316             ChannelHandlerContext ctx,
317             MqttUnsubscribeMessage message) {
318         MqttVersion mqttVersion = getMqttVersion(ctx);
319         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
320                 ctx.alloc(),
321                 message.idAndPropertiesVariableHeader().properties());
322 
323         try {
324             final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
325             int payloadBufferSize = 0;
326 
327             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
328             MqttMessageIdVariableHeader variableHeader = message.variableHeader();
329             MqttUnsubscribePayload payload = message.payload();
330 
331             for (String topicName : payload.topics()) {
332                 int topicNameBytes = utf8Bytes(topicName);
333                 payloadBufferSize += 2 + topicNameBytes;
334             }
335 
336             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
337             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
338 
339             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
340             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
341             writeVariableLengthInt(buf, variablePartSize);
342 
343             // Variable Header
344             int messageId = variableHeader.messageId();
345             buf.writeShort(messageId);
346             buf.writeBytes(propertiesBuf);
347 
348             // Payload
349             for (String topicName : payload.topics()) {
350                 writeUnsafeUTF8String(buf, topicName);
351             }
352 
353             return buf;
354         } finally {
355             propertiesBuf.release();
356         }
357     }
358 
359     private static ByteBuf encodeSubAckMessage(
360             ChannelHandlerContext ctx,
361             MqttSubAckMessage message) {
362         MqttVersion mqttVersion = getMqttVersion(ctx);
363         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
364                 ctx.alloc(),
365                 message.idAndPropertiesVariableHeader().properties());
366         try {
367             int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
368             int payloadBufferSize = message.payload().grantedQoSLevels().size();
369             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
370             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
371             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
372             buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
373             writeVariableLengthInt(buf, variablePartSize);
374             buf.writeShort(message.variableHeader().messageId());
375             buf.writeBytes(propertiesBuf);
376             for (int code: message.payload().reasonCodes()) {
377                 buf.writeByte(code);
378             }
379 
380             return buf;
381         } finally {
382             propertiesBuf.release();
383         }
384     }
385 
386     private static ByteBuf encodeUnsubAckMessage(
387             ChannelHandlerContext ctx,
388             MqttUnsubAckMessage message) {
389         if (message.variableHeader() instanceof  MqttMessageIdAndPropertiesVariableHeader) {
390             MqttVersion mqttVersion = getMqttVersion(ctx);
391             ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
392                     ctx.alloc(),
393                     message.idAndPropertiesVariableHeader().properties());
394             try {
395                 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
396                 MqttUnsubAckPayload payload = message.payload();
397                 int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
398                 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
399                 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
400                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
401                 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
402                 writeVariableLengthInt(buf, variablePartSize);
403                 buf.writeShort(message.variableHeader().messageId());
404                 buf.writeBytes(propertiesBuf);
405 
406                 if (payload != null) {
407                     for (Short reasonCode : payload.unsubscribeReasonCodes()) {
408                         buf.writeByte(reasonCode);
409                     }
410                 }
411 
412                 return buf;
413             } finally {
414                 propertiesBuf.release();
415             }
416         } else {
417             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
418         }
419     }
420 
421     private static ByteBuf encodePublishMessage(
422             ChannelHandlerContext ctx,
423             MqttPublishMessage message) {
424         MqttVersion mqttVersion = getMqttVersion(ctx);
425         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
426         MqttPublishVariableHeader variableHeader = message.variableHeader();
427         ByteBuf payload = message.payload().duplicate();
428 
429         String topicName = variableHeader.topicName();
430         int topicNameBytes = utf8Bytes(topicName);
431 
432         ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
433                 ctx.alloc(),
434                 message.variableHeader().properties());
435 
436         try {
437             int variableHeaderBufferSize = 2 + topicNameBytes +
438                     (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0) + propertiesBuf.readableBytes();
439             int payloadBufferSize = payload.readableBytes();
440             int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
441             int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
442 
443             ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
444             buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
445             writeVariableLengthInt(buf, variablePartSize);
446             writeExactUTF8String(buf, topicName, topicNameBytes);
447             if (mqttFixedHeader.qosLevel().value() > 0) {
448                 buf.writeShort(variableHeader.packetId());
449             }
450             buf.writeBytes(propertiesBuf);
451             buf.writeBytes(payload);
452 
453             return buf;
454         } finally {
455             propertiesBuf.release();
456         }
457     }
458 
459     private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
460                                           MqttMessage message) {
461         if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
462             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
463             MqttPubReplyMessageVariableHeader variableHeader =
464                     (MqttPubReplyMessageVariableHeader) message.variableHeader();
465             int msgId = variableHeader.messageId();
466 
467             final ByteBuf propertiesBuf;
468             final boolean includeReasonCode;
469             final int variableHeaderBufferSize;
470             final MqttVersion mqttVersion = getMqttVersion(ctx);
471             if (mqttVersion == MqttVersion.MQTT_5 &&
472                     (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
473                             !variableHeader.properties().isEmpty())) {
474                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
475                 includeReasonCode = true;
476                 variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
477             } else {
478                 propertiesBuf = Unpooled.EMPTY_BUFFER;
479                 includeReasonCode = false;
480                 variableHeaderBufferSize = 2;
481             }
482 
483             try {
484                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
485                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
486                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
487                 writeVariableLengthInt(buf, variableHeaderBufferSize);
488                 buf.writeShort(msgId);
489                 if (includeReasonCode) {
490                     buf.writeByte(variableHeader.reasonCode());
491                 }
492                 buf.writeBytes(propertiesBuf);
493 
494                 return buf;
495             } finally {
496                 propertiesBuf.release();
497             }
498         } else {
499             return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
500         }
501     }
502 
503     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
504             ByteBufAllocator byteBufAllocator,
505             MqttMessage message) {
506         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
507         MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
508         int msgId = variableHeader.messageId();
509 
510         int variableHeaderBufferSize = 2; // variable part only has a message id
511         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
512         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
513         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
514         writeVariableLengthInt(buf, variableHeaderBufferSize);
515         buf.writeShort(msgId);
516 
517         return buf;
518     }
519 
520     private static ByteBuf encodeReasonCodePlusPropertiesMessage(
521             ChannelHandlerContext ctx,
522             MqttMessage message) {
523         if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
524             MqttVersion mqttVersion = getMqttVersion(ctx);
525             MqttFixedHeader mqttFixedHeader = message.fixedHeader();
526             MqttReasonCodeAndPropertiesVariableHeader variableHeader =
527                     (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
528 
529             final ByteBuf propertiesBuf;
530             final boolean includeReasonCode;
531             final int variableHeaderBufferSize;
532             if (mqttVersion == MqttVersion.MQTT_5 &&
533                     (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
534                             !variableHeader.properties().isEmpty())) {
535                 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
536                 includeReasonCode = true;
537                 variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
538             } else {
539                 propertiesBuf = Unpooled.EMPTY_BUFFER;
540                 includeReasonCode = false;
541                 variableHeaderBufferSize = 0;
542             }
543 
544             try {
545                 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
546                 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
547                 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
548                 writeVariableLengthInt(buf, variableHeaderBufferSize);
549                 if (includeReasonCode) {
550                     buf.writeByte(variableHeader.reasonCode());
551                 }
552                 buf.writeBytes(propertiesBuf);
553 
554                 return buf;
555             } finally {
556                 propertiesBuf.release();
557             }
558         } else {
559             return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
560         }
561     }
562 
563     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
564             ByteBufAllocator byteBufAllocator,
565             MqttMessage message) {
566         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
567         ByteBuf buf = byteBufAllocator.buffer(2);
568         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
569         buf.writeByte(0);
570 
571         return buf;
572     }
573 
574     private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
575                                              ByteBufAllocator byteBufAllocator,
576                                              MqttProperties mqttProperties) {
577         if (mqttVersion == MqttVersion.MQTT_5) {
578             return encodeProperties(byteBufAllocator, mqttProperties);
579         }
580         return Unpooled.EMPTY_BUFFER;
581     }
582 
583     private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
584                                             MqttProperties mqttProperties) {
585         ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
586         // encode also the Properties part
587         try {
588             ByteBuf propertiesBuf = byteBufAllocator.buffer();
589             try {
590                 for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
591                     MqttProperties.MqttPropertyType propertyType =
592                             MqttProperties.MqttPropertyType.valueOf(property.propertyId);
593                     switch (propertyType) {
594                         case PAYLOAD_FORMAT_INDICATOR:
595                         case REQUEST_PROBLEM_INFORMATION:
596                         case REQUEST_RESPONSE_INFORMATION:
597                         case MAXIMUM_QOS:
598                         case RETAIN_AVAILABLE:
599                         case WILDCARD_SUBSCRIPTION_AVAILABLE:
600                         case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
601                         case SHARED_SUBSCRIPTION_AVAILABLE:
602                             writeVariableLengthInt(propertiesBuf, property.propertyId);
603                             final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
604                             propertiesBuf.writeByte(bytePropValue);
605                             break;
606                         case SERVER_KEEP_ALIVE:
607                         case RECEIVE_MAXIMUM:
608                         case TOPIC_ALIAS_MAXIMUM:
609                         case TOPIC_ALIAS:
610                             writeVariableLengthInt(propertiesBuf, property.propertyId);
611                             final short twoBytesInPropValue =
612                                     ((MqttProperties.IntegerProperty) property).value.shortValue();
613                             propertiesBuf.writeShort(twoBytesInPropValue);
614                             break;
615                         case PUBLICATION_EXPIRY_INTERVAL:
616                         case SESSION_EXPIRY_INTERVAL:
617                         case WILL_DELAY_INTERVAL:
618                         case MAXIMUM_PACKET_SIZE:
619                             writeVariableLengthInt(propertiesBuf, property.propertyId);
620                             final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
621                             propertiesBuf.writeInt(fourBytesIntPropValue);
622                             break;
623                         case SUBSCRIPTION_IDENTIFIER:
624                             writeVariableLengthInt(propertiesBuf, property.propertyId);
625                             final int vbi = ((MqttProperties.IntegerProperty) property).value;
626                             writeVariableLengthInt(propertiesBuf, vbi);
627                             break;
628                         case CONTENT_TYPE:
629                         case RESPONSE_TOPIC:
630                         case ASSIGNED_CLIENT_IDENTIFIER:
631                         case AUTHENTICATION_METHOD:
632                         case RESPONSE_INFORMATION:
633                         case SERVER_REFERENCE:
634                         case REASON_STRING:
635                             writeVariableLengthInt(propertiesBuf, property.propertyId);
636                             writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
637                             break;
638                         case USER_PROPERTY:
639                             final List<MqttProperties.StringPair> pairs =
640                                     ((MqttProperties.UserProperties) property).value;
641                             for (MqttProperties.StringPair pair : pairs) {
642                                 writeVariableLengthInt(propertiesBuf, property.propertyId);
643                                 writeEagerUTF8String(propertiesBuf, pair.key);
644                                 writeEagerUTF8String(propertiesBuf, pair.value);
645                             }
646                             break;
647                         case CORRELATION_DATA:
648                         case AUTHENTICATION_DATA:
649                             writeVariableLengthInt(propertiesBuf, property.propertyId);
650                             final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
651                             propertiesBuf.writeShort(binaryPropValue.length);
652                             propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
653                             break;
654                         default:
655                             //shouldn't reach here
656                             throw new EncoderException("Unknown property type: " + propertyType);
657                     }
658                 }
659                 writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
660                 propertiesHeaderBuf.writeBytes(propertiesBuf);
661 
662                 return propertiesHeaderBuf;
663             } finally {
664                 propertiesBuf.release();
665             }
666         } catch (RuntimeException e) {
667             propertiesHeaderBuf.release();
668             throw e;
669         }
670     }
671 
672     private static int getFixedHeaderByte1(MqttFixedHeader header) {
673         int ret = 0;
674         ret |= header.messageType().value() << 4;
675         if (header.isDup()) {
676             ret |= 0x08;
677         }
678         ret |= header.qosLevel().value() << 1;
679         if (header.isRetain()) {
680             ret |= 0x01;
681         }
682         return ret;
683     }
684 
685     private static void writeVariableLengthInt(ByteBuf buf, int num) {
686         do {
687             int digit = num % 128;
688             num /= 128;
689             if (num > 0) {
690                 digit |= 0x80;
691             }
692             buf.writeByte(digit);
693         } while (num > 0);
694     }
695 
696     private static int nullableUtf8Bytes(String s) {
697         return s == null? 0 : utf8Bytes(s);
698     }
699 
700     private static int nullableMaxUtf8Bytes(String s) {
701         return s == null? 0 : utf8MaxBytes(s);
702     }
703 
704     private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
705         buf.ensureWritable(utf8Length + 2);
706         buf.writeShort(utf8Length);
707         if (utf8Length > 0) {
708             final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
709             assert writtenUtf8Length == utf8Length;
710         }
711     }
712 
713     private static void writeEagerUTF8String(ByteBuf buf, String s) {
714         final int maxUtf8Length = nullableMaxUtf8Bytes(s);
715         buf.ensureWritable(maxUtf8Length + 2);
716         final int writerIndex = buf.writerIndex();
717         final int startUtf8String = writerIndex + 2;
718         buf.writerIndex(startUtf8String);
719         final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
720         buf.setShort(writerIndex, utf8Length);
721     }
722 
723     private static void writeUnsafeUTF8String(ByteBuf buf, String s) {
724         final int writerIndex = buf.writerIndex();
725         final int startUtf8String = writerIndex + 2;
726         // no need to reserve any capacity here, already done earlier: that's why is Unsafe
727         buf.writerIndex(startUtf8String);
728         final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, 0) : 0;
729         buf.setShort(writerIndex, utf8Length);
730     }
731 
732     private static int getVariableLengthInt(int num) {
733         int count = 0;
734         do {
735             num /= 128;
736             count++;
737         } while (num > 0);
738         return count;
739     }
740 
741 }