1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.handler.codec.EncoderException;
25 import io.netty.handler.codec.MessageToMessageEncoder;
26 import io.netty.util.internal.EmptyArrays;
27
28 import java.util.List;
29
30 import static io.netty.buffer.ByteBufUtil.*;
31 import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
32 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
34 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
35 import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
36 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
37 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
38 import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
39 import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
40 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
41 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
42 import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
43 import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
44 import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
45 import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
46 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
47 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
48 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
49 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
50 import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
51 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
52 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
53 import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
54 import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
55 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
56 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
57 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
58 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
59 import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
60 import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
61 import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
62
63
64
65
66
67
68
69 @ChannelHandler.Sharable
70 public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
71
72 public static final MqttEncoder INSTANCE = new MqttEncoder();
73
74 private MqttEncoder() {
75 super(MqttMessage.class);
76 }
77
78 @Override
79 protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
80 out.add(doEncode(ctx, msg));
81 }
82
83
84
85
86
87
88
89
90 static ByteBuf doEncode(ChannelHandlerContext ctx,
91 MqttMessage message) {
92
93 switch (message.fixedHeader().messageType()) {
94 case CONNECT:
95 return encodeConnectMessage(ctx, (MqttConnectMessage) message);
96
97 case CONNACK:
98 return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
99
100 case PUBLISH:
101 return encodePublishMessage(ctx, (MqttPublishMessage) message);
102
103 case SUBSCRIBE:
104 return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
105
106 case UNSUBSCRIBE:
107 return encodeUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message);
108
109 case SUBACK:
110 return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
111
112 case UNSUBACK:
113 if (message instanceof MqttUnsubAckMessage) {
114 return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
115 }
116 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
117
118 case PUBACK:
119 case PUBREC:
120 case PUBREL:
121 case PUBCOMP:
122 return encodePubReplyMessage(ctx, message);
123
124 case DISCONNECT:
125 case AUTH:
126 return encodeReasonCodePlusPropertiesMessage(ctx, message);
127
128 case PINGREQ:
129 case PINGRESP:
130 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
131
132 default:
133 throw new IllegalArgumentException(
134 "Unknown message type: " + message.fixedHeader().messageType().value());
135 }
136 }
137
138 private static ByteBuf encodeConnectMessage(
139 ChannelHandlerContext ctx,
140 MqttConnectMessage message) {
141 int payloadBufferSize = 0;
142
143 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
144 MqttConnectVariableHeader variableHeader = message.variableHeader();
145 MqttConnectPayload payload = message.payload();
146 MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
147 (byte) variableHeader.version());
148 setMqttVersion(ctx, mqttVersion);
149
150
151 if ((mqttVersion == MqttVersion.MQTT_3_1 || mqttVersion == MqttVersion.MQTT_3_1_1) &&
152 !variableHeader.hasUserName() && variableHeader.hasPassword()) {
153 throw new EncoderException("Without a username, the password MUST be not set");
154 }
155
156
157 String clientIdentifier = payload.clientIdentifier();
158 if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
159 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
160 }
161 int clientIdentifierBytes = utf8Bytes(clientIdentifier);
162 payloadBufferSize += 2 + clientIdentifierBytes;
163
164
165 String willTopic = payload.willTopic();
166 int willTopicBytes = nullableUtf8Bytes(willTopic);
167 byte[] willMessage = payload.willMessageInBytes();
168 byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
169 if (variableHeader.isWillFlag()) {
170 payloadBufferSize += 2 + willTopicBytes;
171 payloadBufferSize += 2 + willMessageBytes.length;
172 }
173
174 String userName = payload.userName();
175 int userNameBytes = nullableUtf8Bytes(userName);
176 if (variableHeader.hasUserName()) {
177 payloadBufferSize += 2 + userNameBytes;
178 }
179
180 byte[] password = payload.passwordInBytes();
181 byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
182 if (variableHeader.hasPassword()) {
183 payloadBufferSize += 2 + passwordBytes.length;
184 }
185
186
187 byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
188 ByteBuf propertiesBuf = encodePropertiesIfNeeded(
189 mqttVersion,
190 ctx.alloc(),
191 message.variableHeader().properties());
192 try {
193 final ByteBuf willPropertiesBuf;
194 if (variableHeader.isWillFlag()) {
195 willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
196 payloadBufferSize += willPropertiesBuf.readableBytes();
197 } else {
198 willPropertiesBuf = Unpooled.EMPTY_BUFFER;
199 }
200 try {
201 int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
202
203 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
204 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
205 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
206 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
207 writeVariableLengthInt(buf, variablePartSize);
208
209 buf.writeShort(protocolNameBytes.length);
210 buf.writeBytes(protocolNameBytes);
211
212 buf.writeByte(variableHeader.version());
213 buf.writeByte(getConnVariableHeaderFlag(variableHeader));
214 buf.writeShort(variableHeader.keepAliveTimeSeconds());
215 buf.writeBytes(propertiesBuf);
216
217
218 writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
219 if (variableHeader.isWillFlag()) {
220 buf.writeBytes(willPropertiesBuf);
221 writeExactUTF8String(buf, willTopic, willTopicBytes);
222 buf.writeShort(willMessageBytes.length);
223 buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
224 }
225 if (variableHeader.hasUserName()) {
226 writeExactUTF8String(buf, userName, userNameBytes);
227 }
228 if (variableHeader.hasPassword()) {
229 buf.writeShort(passwordBytes.length);
230 buf.writeBytes(passwordBytes, 0, passwordBytes.length);
231 }
232 return buf;
233 } finally {
234 willPropertiesBuf.release();
235 }
236 } finally {
237 propertiesBuf.release();
238 }
239 }
240
241 private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
242 int flagByte = 0;
243 if (variableHeader.hasUserName()) {
244 flagByte |= 0x80;
245 }
246 if (variableHeader.hasPassword()) {
247 flagByte |= 0x40;
248 }
249 if (variableHeader.isWillRetain()) {
250 flagByte |= 0x20;
251 }
252 flagByte |= (variableHeader.willQos() & 0x03) << 3;
253 if (variableHeader.isWillFlag()) {
254 flagByte |= 0x04;
255 }
256 if (variableHeader.isCleanSession()) {
257 flagByte |= 0x02;
258 }
259 return flagByte;
260 }
261
262 private static ByteBuf encodeConnAckMessage(
263 ChannelHandlerContext ctx,
264 MqttConnAckMessage message) {
265 final MqttVersion mqttVersion = getMqttVersion(ctx);
266 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
267 ctx.alloc(),
268 message.variableHeader().properties());
269
270 try {
271 ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
272 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
273 writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
274 buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
275 buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
276 buf.writeBytes(propertiesBuf);
277 return buf;
278 } finally {
279 propertiesBuf.release();
280 }
281 }
282
283 private static ByteBuf encodeSubscribeMessage(
284 ChannelHandlerContext ctx,
285 MqttSubscribeMessage message) {
286 MqttVersion mqttVersion = getMqttVersion(ctx);
287 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
288 ctx.alloc(),
289 message.idAndPropertiesVariableHeader().properties());
290
291 try {
292 final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
293 int payloadBufferSize = 0;
294
295 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
296 MqttMessageIdVariableHeader variableHeader = message.variableHeader();
297 MqttSubscribePayload payload = message.payload();
298
299 for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
300 String topicName = topic.topicName();
301 int topicNameBytes = utf8Bytes(topicName);
302 payloadBufferSize += 2 + topicNameBytes;
303 payloadBufferSize += 1;
304 }
305
306 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
307 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
308
309 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
310 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
311 writeVariableLengthInt(buf, variablePartSize);
312
313
314 int messageId = variableHeader.messageId();
315 buf.writeShort(messageId);
316 buf.writeBytes(propertiesBuf);
317
318
319 for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
320 writeEagerUTF8String(buf, topic.topicName());
321 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) {
322 buf.writeByte(topic.qualityOfService().value());
323 } else {
324 final MqttSubscriptionOption option = topic.option();
325
326 int optionEncoded = option.retainHandling().value() << 4;
327 if (option.isRetainAsPublished()) {
328 optionEncoded |= 0x08;
329 }
330 if (option.isNoLocal()) {
331 optionEncoded |= 0x04;
332 }
333 optionEncoded |= option.qos().value();
334
335 buf.writeByte(optionEncoded);
336 }
337 }
338
339 return buf;
340 } finally {
341 propertiesBuf.release();
342 }
343 }
344
345 private static ByteBuf encodeUnsubscribeMessage(
346 ChannelHandlerContext ctx,
347 MqttUnsubscribeMessage message) {
348 MqttVersion mqttVersion = getMqttVersion(ctx);
349 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
350 ctx.alloc(),
351 message.idAndPropertiesVariableHeader().properties());
352
353 try {
354 final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
355 int payloadBufferSize = 0;
356
357 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
358 MqttMessageIdVariableHeader variableHeader = message.variableHeader();
359 MqttUnsubscribePayload payload = message.payload();
360
361 for (String topicName : payload.topics()) {
362 int topicNameBytes = utf8Bytes(topicName);
363 payloadBufferSize += 2 + topicNameBytes;
364 }
365
366 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
367 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
368
369 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
370 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
371 writeVariableLengthInt(buf, variablePartSize);
372
373
374 int messageId = variableHeader.messageId();
375 buf.writeShort(messageId);
376 buf.writeBytes(propertiesBuf);
377
378
379 for (String topicName : payload.topics()) {
380 writeEagerUTF8String(buf, topicName);
381 }
382
383 return buf;
384 } finally {
385 propertiesBuf.release();
386 }
387 }
388
389 private static ByteBuf encodeSubAckMessage(
390 ChannelHandlerContext ctx,
391 MqttSubAckMessage message) {
392 MqttVersion mqttVersion = getMqttVersion(ctx);
393 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
394 ctx.alloc(),
395 message.idAndPropertiesVariableHeader().properties());
396 try {
397 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
398 int payloadBufferSize = message.payload().grantedQoSLevels().size();
399 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
400 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
401 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
402 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
403 writeVariableLengthInt(buf, variablePartSize);
404 buf.writeShort(message.variableHeader().messageId());
405 buf.writeBytes(propertiesBuf);
406 for (int code: message.payload().reasonCodes()) {
407 buf.writeByte(code);
408 }
409
410 return buf;
411 } finally {
412 propertiesBuf.release();
413 }
414 }
415
416 private static ByteBuf encodeUnsubAckMessage(
417 ChannelHandlerContext ctx,
418 MqttUnsubAckMessage message) {
419 if (message.variableHeader() instanceof MqttMessageIdAndPropertiesVariableHeader) {
420 MqttVersion mqttVersion = getMqttVersion(ctx);
421 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
422 ctx.alloc(),
423 message.idAndPropertiesVariableHeader().properties());
424 try {
425 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
426 MqttUnsubAckPayload payload = message.payload();
427 int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
428 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
429 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
430 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
431 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
432 writeVariableLengthInt(buf, variablePartSize);
433 buf.writeShort(message.variableHeader().messageId());
434 buf.writeBytes(propertiesBuf);
435
436 if (payload != null) {
437 for (Short reasonCode : payload.unsubscribeReasonCodes()) {
438 buf.writeByte(reasonCode);
439 }
440 }
441
442 return buf;
443 } finally {
444 propertiesBuf.release();
445 }
446 } else {
447 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
448 }
449 }
450
451 private static ByteBuf encodePublishMessage(
452 ChannelHandlerContext ctx,
453 MqttPublishMessage message) {
454 MqttVersion mqttVersion = getMqttVersion(ctx);
455 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
456 MqttPublishVariableHeader variableHeader = message.variableHeader();
457 ByteBuf payload = message.payload().duplicate();
458
459 String topicName = variableHeader.topicName();
460 int topicNameBytes = utf8Bytes(topicName);
461
462 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
463 ctx.alloc(),
464 message.variableHeader().properties());
465
466 try {
467 boolean qosLevelGreaterZero = mqttFixedHeader.qosLevel().value() > 0;
468 int variableHeaderBufferSize = 2 + topicNameBytes +
469 (qosLevelGreaterZero ? 2 : 0) + propertiesBuf.readableBytes();
470 int payloadBufferSize = payload.readableBytes();
471 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
472 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
473
474 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
475 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
476 writeVariableLengthInt(buf, variablePartSize);
477 writeExactUTF8String(buf, topicName, topicNameBytes);
478 if (qosLevelGreaterZero) {
479 buf.writeShort(variableHeader.packetId());
480 }
481 buf.writeBytes(propertiesBuf);
482 buf.writeBytes(payload);
483
484 return buf;
485 } finally {
486 propertiesBuf.release();
487 }
488 }
489
490 private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
491 MqttMessage message) {
492 if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
493 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
494 MqttPubReplyMessageVariableHeader variableHeader =
495 (MqttPubReplyMessageVariableHeader) message.variableHeader();
496 int msgId = variableHeader.messageId();
497
498 final ByteBuf propertiesBuf;
499 final boolean includeReasonCode;
500 final int variableHeaderBufferSize;
501 final MqttVersion mqttVersion = getMqttVersion(ctx);
502 if (mqttVersion == MqttVersion.MQTT_5 &&
503 (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
504 !variableHeader.properties().isEmpty())) {
505 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
506 includeReasonCode = true;
507 variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
508 } else {
509 propertiesBuf = Unpooled.EMPTY_BUFFER;
510 includeReasonCode = false;
511 variableHeaderBufferSize = 2;
512 }
513
514 try {
515 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
516 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
517 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
518 writeVariableLengthInt(buf, variableHeaderBufferSize);
519 buf.writeShort(msgId);
520 if (includeReasonCode) {
521 buf.writeByte(variableHeader.reasonCode());
522 }
523 buf.writeBytes(propertiesBuf);
524
525 return buf;
526 } finally {
527 propertiesBuf.release();
528 }
529 } else {
530 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
531 }
532 }
533
534 private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
535 ByteBufAllocator byteBufAllocator,
536 MqttMessage message) {
537 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
538 MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
539 int msgId = variableHeader.messageId();
540
541 int variableHeaderBufferSize = 2;
542 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
543 ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
544 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
545 writeVariableLengthInt(buf, variableHeaderBufferSize);
546 buf.writeShort(msgId);
547
548 return buf;
549 }
550
551 private static ByteBuf encodeReasonCodePlusPropertiesMessage(
552 ChannelHandlerContext ctx,
553 MqttMessage message) {
554 if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
555 MqttVersion mqttVersion = getMqttVersion(ctx);
556 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
557 MqttReasonCodeAndPropertiesVariableHeader variableHeader =
558 (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
559
560 final ByteBuf propertiesBuf;
561 final boolean includeReasonCode;
562 final int variableHeaderBufferSize;
563 if (mqttVersion == MqttVersion.MQTT_5 &&
564 (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
565 !variableHeader.properties().isEmpty())) {
566 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
567 includeReasonCode = true;
568 variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
569 } else {
570 propertiesBuf = Unpooled.EMPTY_BUFFER;
571 includeReasonCode = false;
572 variableHeaderBufferSize = 0;
573 }
574
575 try {
576 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
577 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
578 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
579 writeVariableLengthInt(buf, variableHeaderBufferSize);
580 if (includeReasonCode) {
581 buf.writeByte(variableHeader.reasonCode());
582 }
583 buf.writeBytes(propertiesBuf);
584
585 return buf;
586 } finally {
587 propertiesBuf.release();
588 }
589 } else {
590 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
591 }
592 }
593
594 private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
595 ByteBufAllocator byteBufAllocator,
596 MqttMessage message) {
597 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
598 ByteBuf buf = byteBufAllocator.buffer(2);
599 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
600 buf.writeByte(0);
601
602 return buf;
603 }
604
605 private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
606 ByteBufAllocator byteBufAllocator,
607 MqttProperties mqttProperties) {
608 if (mqttVersion == MqttVersion.MQTT_5) {
609 return encodeProperties(byteBufAllocator, mqttProperties);
610 }
611 return Unpooled.EMPTY_BUFFER;
612 }
613
614 private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
615 MqttProperties mqttProperties) {
616 ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
617
618 try {
619 ByteBuf propertiesBuf = byteBufAllocator.buffer();
620 try {
621 for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
622 int propertyId = property.propertyId;
623 switch (propertyId) {
624 case PAYLOAD_FORMAT_INDICATOR:
625 case REQUEST_PROBLEM_INFORMATION:
626 case REQUEST_RESPONSE_INFORMATION:
627 case MAXIMUM_QOS:
628 case RETAIN_AVAILABLE:
629 case WILDCARD_SUBSCRIPTION_AVAILABLE:
630 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
631 case SHARED_SUBSCRIPTION_AVAILABLE:
632 writeVariableLengthInt(propertiesBuf, propertyId);
633 final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
634 propertiesBuf.writeByte(bytePropValue);
635 break;
636 case SERVER_KEEP_ALIVE:
637 case RECEIVE_MAXIMUM:
638 case TOPIC_ALIAS_MAXIMUM:
639 case TOPIC_ALIAS:
640 writeVariableLengthInt(propertiesBuf, propertyId);
641 final short twoBytesInPropValue =
642 ((MqttProperties.IntegerProperty) property).value.shortValue();
643 propertiesBuf.writeShort(twoBytesInPropValue);
644 break;
645 case PUBLICATION_EXPIRY_INTERVAL:
646 case SESSION_EXPIRY_INTERVAL:
647 case WILL_DELAY_INTERVAL:
648 case MAXIMUM_PACKET_SIZE:
649 writeVariableLengthInt(propertiesBuf, propertyId);
650 final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
651 propertiesBuf.writeInt(fourBytesIntPropValue);
652 break;
653 case SUBSCRIPTION_IDENTIFIER:
654 writeVariableLengthInt(propertiesBuf, propertyId);
655 final int vbi = ((MqttProperties.IntegerProperty) property).value;
656 writeVariableLengthInt(propertiesBuf, vbi);
657 break;
658 case CONTENT_TYPE:
659 case RESPONSE_TOPIC:
660 case ASSIGNED_CLIENT_IDENTIFIER:
661 case AUTHENTICATION_METHOD:
662 case RESPONSE_INFORMATION:
663 case SERVER_REFERENCE:
664 case REASON_STRING:
665 writeVariableLengthInt(propertiesBuf, propertyId);
666 writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
667 break;
668 case USER_PROPERTY:
669 final List<MqttProperties.StringPair> pairs =
670 ((MqttProperties.UserProperties) property).value;
671 for (MqttProperties.StringPair pair : pairs) {
672 writeVariableLengthInt(propertiesBuf, propertyId);
673 writeEagerUTF8String(propertiesBuf, pair.key);
674 writeEagerUTF8String(propertiesBuf, pair.value);
675 }
676 break;
677 case CORRELATION_DATA:
678 case AUTHENTICATION_DATA:
679 writeVariableLengthInt(propertiesBuf, propertyId);
680 final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
681 propertiesBuf.writeShort(binaryPropValue.length);
682 propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
683 break;
684 default:
685
686 throw new EncoderException("Unknown property type: " + propertyId);
687 }
688 }
689 writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
690 propertiesHeaderBuf.writeBytes(propertiesBuf);
691
692 return propertiesHeaderBuf;
693 } finally {
694 propertiesBuf.release();
695 }
696 } catch (RuntimeException e) {
697 propertiesHeaderBuf.release();
698 throw e;
699 }
700 }
701
702 private static int getFixedHeaderByte1(MqttFixedHeader header) {
703 int ret = 0;
704 ret |= header.messageType().value() << 4;
705 if (header.isDup()) {
706 ret |= 0x08;
707 }
708 ret |= header.qosLevel().value() << 1;
709 if (header.isRetain()) {
710 ret |= 0x01;
711 }
712 return ret;
713 }
714
715 private static void writeVariableLengthInt(ByteBuf buf, int num) {
716 do {
717 int digit = num & 0x7F;
718 num >>>= 7;
719 if (num > 0) {
720 digit |= 0x80;
721 }
722 buf.writeByte(digit);
723 } while (num > 0);
724 }
725
726 private static int nullableUtf8Bytes(String s) {
727 return s == null? 0 : utf8Bytes(s);
728 }
729
730 private static int nullableMaxUtf8Bytes(String s) {
731 return s == null? 0 : utf8MaxBytes(s);
732 }
733
734 private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
735 buf.ensureWritable(utf8Length + 2);
736 buf.writeShort(utf8Length);
737 if (utf8Length > 0) {
738 final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
739 assert writtenUtf8Length == utf8Length;
740 }
741 }
742
743 private static void writeEagerUTF8String(ByteBuf buf, String s) {
744 final int maxUtf8Length = nullableMaxUtf8Bytes(s);
745 buf.ensureWritable(maxUtf8Length + 2);
746 final int writerIndex = buf.writerIndex();
747 final int startUtf8String = writerIndex + 2;
748 buf.writerIndex(startUtf8String);
749 final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
750 buf.setShort(writerIndex, utf8Length);
751 }
752
753 private static int getVariableLengthInt(int num) {
754 if (num < 128) {
755 return 1;
756 }
757 if (num < 16_384) {
758 return 2;
759 }
760 if (num < 2_097_152) {
761 return 3;
762 }
763 return 4;
764 }
765
766 }