1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.handler.codec.EncoderException;
25 import io.netty.handler.codec.MessageToMessageEncoder;
26 import io.netty.util.internal.EmptyArrays;
27
28 import java.util.List;
29
30 import static io.netty.buffer.ByteBufUtil.*;
31 import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
32 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
34 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
35 import static io.netty.handler.codec.mqtt.MqttProperties.ASSIGNED_CLIENT_IDENTIFIER;
36 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_DATA;
37 import static io.netty.handler.codec.mqtt.MqttProperties.AUTHENTICATION_METHOD;
38 import static io.netty.handler.codec.mqtt.MqttProperties.CONTENT_TYPE;
39 import static io.netty.handler.codec.mqtt.MqttProperties.CORRELATION_DATA;
40 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_PACKET_SIZE;
41 import static io.netty.handler.codec.mqtt.MqttProperties.MAXIMUM_QOS;
42 import static io.netty.handler.codec.mqtt.MqttProperties.PAYLOAD_FORMAT_INDICATOR;
43 import static io.netty.handler.codec.mqtt.MqttProperties.PUBLICATION_EXPIRY_INTERVAL;
44 import static io.netty.handler.codec.mqtt.MqttProperties.REASON_STRING;
45 import static io.netty.handler.codec.mqtt.MqttProperties.RECEIVE_MAXIMUM;
46 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_PROBLEM_INFORMATION;
47 import static io.netty.handler.codec.mqtt.MqttProperties.REQUEST_RESPONSE_INFORMATION;
48 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_INFORMATION;
49 import static io.netty.handler.codec.mqtt.MqttProperties.RESPONSE_TOPIC;
50 import static io.netty.handler.codec.mqtt.MqttProperties.RETAIN_AVAILABLE;
51 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_KEEP_ALIVE;
52 import static io.netty.handler.codec.mqtt.MqttProperties.SERVER_REFERENCE;
53 import static io.netty.handler.codec.mqtt.MqttProperties.SESSION_EXPIRY_INTERVAL;
54 import static io.netty.handler.codec.mqtt.MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE;
55 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER;
56 import static io.netty.handler.codec.mqtt.MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
57 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS;
58 import static io.netty.handler.codec.mqtt.MqttProperties.TOPIC_ALIAS_MAXIMUM;
59 import static io.netty.handler.codec.mqtt.MqttProperties.USER_PROPERTY;
60 import static io.netty.handler.codec.mqtt.MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE;
61 import static io.netty.handler.codec.mqtt.MqttProperties.WILL_DELAY_INTERVAL;
62
63
64
65
66
67
68
69 @ChannelHandler.Sharable
70 public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
71
72 public static final MqttEncoder INSTANCE = new MqttEncoder();
73
74 private MqttEncoder() {
75 super(MqttMessage.class);
76 }
77
78 @Override
79 protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
80 out.add(doEncode(ctx, msg));
81 }
82
83
84
85
86
87
88
89
90 static ByteBuf doEncode(ChannelHandlerContext ctx,
91 MqttMessage message) {
92
93 switch (message.fixedHeader().messageType()) {
94 case CONNECT:
95 return encodeConnectMessage(ctx, (MqttConnectMessage) message);
96
97 case CONNACK:
98 return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
99
100 case PUBLISH:
101 return encodePublishMessage(ctx, (MqttPublishMessage) message);
102
103 case SUBSCRIBE:
104 return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
105
106 case UNSUBSCRIBE:
107 return encodeUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message);
108
109 case SUBACK:
110 return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
111
112 case UNSUBACK:
113 if (message instanceof MqttUnsubAckMessage) {
114 return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
115 }
116 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
117
118 case PUBACK:
119 case PUBREC:
120 case PUBREL:
121 case PUBCOMP:
122 return encodePubReplyMessage(ctx, message);
123
124 case DISCONNECT:
125 case AUTH:
126 return encodeReasonCodePlusPropertiesMessage(ctx, message);
127
128 case PINGREQ:
129 case PINGRESP:
130 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
131
132 default:
133 throw new IllegalArgumentException(
134 "Unknown message type: " + message.fixedHeader().messageType().value());
135 }
136 }
137
138 private static ByteBuf encodeConnectMessage(
139 ChannelHandlerContext ctx,
140 MqttConnectMessage message) {
141 int payloadBufferSize = 0;
142
143 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
144 MqttConnectVariableHeader variableHeader = message.variableHeader();
145 MqttConnectPayload payload = message.payload();
146 MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
147 (byte) variableHeader.version());
148 setMqttVersion(ctx, mqttVersion);
149
150
151 if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
152 throw new EncoderException("Without a username, the password MUST be not set");
153 }
154
155
156 String clientIdentifier = payload.clientIdentifier();
157 if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
158 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
159 }
160 int clientIdentifierBytes = utf8Bytes(clientIdentifier);
161 payloadBufferSize += 2 + clientIdentifierBytes;
162
163
164 String willTopic = payload.willTopic();
165 int willTopicBytes = nullableUtf8Bytes(willTopic);
166 byte[] willMessage = payload.willMessageInBytes();
167 byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
168 if (variableHeader.isWillFlag()) {
169 payloadBufferSize += 2 + willTopicBytes;
170 payloadBufferSize += 2 + willMessageBytes.length;
171 }
172
173 String userName = payload.userName();
174 int userNameBytes = nullableUtf8Bytes(userName);
175 if (variableHeader.hasUserName()) {
176 payloadBufferSize += 2 + userNameBytes;
177 }
178
179 byte[] password = payload.passwordInBytes();
180 byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
181 if (variableHeader.hasPassword()) {
182 payloadBufferSize += 2 + passwordBytes.length;
183 }
184
185
186 byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
187 ByteBuf propertiesBuf = encodePropertiesIfNeeded(
188 mqttVersion,
189 ctx.alloc(),
190 message.variableHeader().properties());
191 try {
192 final ByteBuf willPropertiesBuf;
193 if (variableHeader.isWillFlag()) {
194 willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
195 payloadBufferSize += willPropertiesBuf.readableBytes();
196 } else {
197 willPropertiesBuf = Unpooled.EMPTY_BUFFER;
198 }
199 try {
200 int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
201
202 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
203 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
204 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
205 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
206 writeVariableLengthInt(buf, variablePartSize);
207
208 buf.writeShort(protocolNameBytes.length);
209 buf.writeBytes(protocolNameBytes);
210
211 buf.writeByte(variableHeader.version());
212 buf.writeByte(getConnVariableHeaderFlag(variableHeader));
213 buf.writeShort(variableHeader.keepAliveTimeSeconds());
214 buf.writeBytes(propertiesBuf);
215
216
217 writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
218 if (variableHeader.isWillFlag()) {
219 buf.writeBytes(willPropertiesBuf);
220 writeExactUTF8String(buf, willTopic, willTopicBytes);
221 buf.writeShort(willMessageBytes.length);
222 buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
223 }
224 if (variableHeader.hasUserName()) {
225 writeExactUTF8String(buf, userName, userNameBytes);
226 }
227 if (variableHeader.hasPassword()) {
228 buf.writeShort(passwordBytes.length);
229 buf.writeBytes(passwordBytes, 0, passwordBytes.length);
230 }
231 return buf;
232 } finally {
233 willPropertiesBuf.release();
234 }
235 } finally {
236 propertiesBuf.release();
237 }
238 }
239
240 private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
241 int flagByte = 0;
242 if (variableHeader.hasUserName()) {
243 flagByte |= 0x80;
244 }
245 if (variableHeader.hasPassword()) {
246 flagByte |= 0x40;
247 }
248 if (variableHeader.isWillRetain()) {
249 flagByte |= 0x20;
250 }
251 flagByte |= (variableHeader.willQos() & 0x03) << 3;
252 if (variableHeader.isWillFlag()) {
253 flagByte |= 0x04;
254 }
255 if (variableHeader.isCleanSession()) {
256 flagByte |= 0x02;
257 }
258 return flagByte;
259 }
260
261 private static ByteBuf encodeConnAckMessage(
262 ChannelHandlerContext ctx,
263 MqttConnAckMessage message) {
264 final MqttVersion mqttVersion = getMqttVersion(ctx);
265 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
266 ctx.alloc(),
267 message.variableHeader().properties());
268
269 try {
270 ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
271 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
272 writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
273 buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
274 buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
275 buf.writeBytes(propertiesBuf);
276 return buf;
277 } finally {
278 propertiesBuf.release();
279 }
280 }
281
282 private static ByteBuf encodeSubscribeMessage(
283 ChannelHandlerContext ctx,
284 MqttSubscribeMessage message) {
285 MqttVersion mqttVersion = getMqttVersion(ctx);
286 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
287 ctx.alloc(),
288 message.idAndPropertiesVariableHeader().properties());
289
290 try {
291 final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
292 int payloadBufferSize = 0;
293
294 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
295 MqttMessageIdVariableHeader variableHeader = message.variableHeader();
296 MqttSubscribePayload payload = message.payload();
297
298 for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
299 String topicName = topic.topicName();
300 int topicNameBytes = utf8Bytes(topicName);
301 payloadBufferSize += 2 + topicNameBytes;
302 payloadBufferSize += 1;
303 }
304
305 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
306 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
307
308 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
309 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
310 writeVariableLengthInt(buf, variablePartSize);
311
312
313 int messageId = variableHeader.messageId();
314 buf.writeShort(messageId);
315 buf.writeBytes(propertiesBuf);
316
317
318 for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
319 writeUnsafeUTF8String(buf, topic.topicName());
320 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) {
321 buf.writeByte(topic.qualityOfService().value());
322 } else {
323 final MqttSubscriptionOption option = topic.option();
324
325 int optionEncoded = option.retainHandling().value() << 4;
326 if (option.isRetainAsPublished()) {
327 optionEncoded |= 0x08;
328 }
329 if (option.isNoLocal()) {
330 optionEncoded |= 0x04;
331 }
332 optionEncoded |= option.qos().value();
333
334 buf.writeByte(optionEncoded);
335 }
336 }
337
338 return buf;
339 } finally {
340 propertiesBuf.release();
341 }
342 }
343
344 private static ByteBuf encodeUnsubscribeMessage(
345 ChannelHandlerContext ctx,
346 MqttUnsubscribeMessage message) {
347 MqttVersion mqttVersion = getMqttVersion(ctx);
348 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
349 ctx.alloc(),
350 message.idAndPropertiesVariableHeader().properties());
351
352 try {
353 final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
354 int payloadBufferSize = 0;
355
356 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
357 MqttMessageIdVariableHeader variableHeader = message.variableHeader();
358 MqttUnsubscribePayload payload = message.payload();
359
360 for (String topicName : payload.topics()) {
361 int topicNameBytes = utf8Bytes(topicName);
362 payloadBufferSize += 2 + topicNameBytes;
363 }
364
365 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
366 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
367
368 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
369 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
370 writeVariableLengthInt(buf, variablePartSize);
371
372
373 int messageId = variableHeader.messageId();
374 buf.writeShort(messageId);
375 buf.writeBytes(propertiesBuf);
376
377
378 for (String topicName : payload.topics()) {
379 writeUnsafeUTF8String(buf, topicName);
380 }
381
382 return buf;
383 } finally {
384 propertiesBuf.release();
385 }
386 }
387
388 private static ByteBuf encodeSubAckMessage(
389 ChannelHandlerContext ctx,
390 MqttSubAckMessage message) {
391 MqttVersion mqttVersion = getMqttVersion(ctx);
392 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
393 ctx.alloc(),
394 message.idAndPropertiesVariableHeader().properties());
395 try {
396 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
397 int payloadBufferSize = message.payload().grantedQoSLevels().size();
398 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
399 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
400 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
401 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
402 writeVariableLengthInt(buf, variablePartSize);
403 buf.writeShort(message.variableHeader().messageId());
404 buf.writeBytes(propertiesBuf);
405 for (int code: message.payload().reasonCodes()) {
406 buf.writeByte(code);
407 }
408
409 return buf;
410 } finally {
411 propertiesBuf.release();
412 }
413 }
414
415 private static ByteBuf encodeUnsubAckMessage(
416 ChannelHandlerContext ctx,
417 MqttUnsubAckMessage message) {
418 if (message.variableHeader() instanceof MqttMessageIdAndPropertiesVariableHeader) {
419 MqttVersion mqttVersion = getMqttVersion(ctx);
420 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
421 ctx.alloc(),
422 message.idAndPropertiesVariableHeader().properties());
423 try {
424 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
425 MqttUnsubAckPayload payload = message.payload();
426 int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
427 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
428 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
429 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
430 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
431 writeVariableLengthInt(buf, variablePartSize);
432 buf.writeShort(message.variableHeader().messageId());
433 buf.writeBytes(propertiesBuf);
434
435 if (payload != null) {
436 for (Short reasonCode : payload.unsubscribeReasonCodes()) {
437 buf.writeByte(reasonCode);
438 }
439 }
440
441 return buf;
442 } finally {
443 propertiesBuf.release();
444 }
445 } else {
446 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
447 }
448 }
449
450 private static ByteBuf encodePublishMessage(
451 ChannelHandlerContext ctx,
452 MqttPublishMessage message) {
453 MqttVersion mqttVersion = getMqttVersion(ctx);
454 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
455 MqttPublishVariableHeader variableHeader = message.variableHeader();
456 ByteBuf payload = message.payload().duplicate();
457
458 String topicName = variableHeader.topicName();
459 int topicNameBytes = utf8Bytes(topicName);
460
461 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
462 ctx.alloc(),
463 message.variableHeader().properties());
464
465 try {
466 boolean qosLevelGreaterZero = mqttFixedHeader.qosLevel().value() > 0;
467 int variableHeaderBufferSize = 2 + topicNameBytes +
468 (qosLevelGreaterZero ? 2 : 0) + propertiesBuf.readableBytes();
469 int payloadBufferSize = payload.readableBytes();
470 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
471 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
472
473 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
474 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
475 writeVariableLengthInt(buf, variablePartSize);
476 writeExactUTF8String(buf, topicName, topicNameBytes);
477 if (qosLevelGreaterZero) {
478 buf.writeShort(variableHeader.packetId());
479 }
480 buf.writeBytes(propertiesBuf);
481 buf.writeBytes(payload);
482
483 return buf;
484 } finally {
485 propertiesBuf.release();
486 }
487 }
488
489 private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
490 MqttMessage message) {
491 if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
492 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
493 MqttPubReplyMessageVariableHeader variableHeader =
494 (MqttPubReplyMessageVariableHeader) message.variableHeader();
495 int msgId = variableHeader.messageId();
496
497 final ByteBuf propertiesBuf;
498 final boolean includeReasonCode;
499 final int variableHeaderBufferSize;
500 final MqttVersion mqttVersion = getMqttVersion(ctx);
501 if (mqttVersion == MqttVersion.MQTT_5 &&
502 (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
503 !variableHeader.properties().isEmpty())) {
504 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
505 includeReasonCode = true;
506 variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
507 } else {
508 propertiesBuf = Unpooled.EMPTY_BUFFER;
509 includeReasonCode = false;
510 variableHeaderBufferSize = 2;
511 }
512
513 try {
514 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
515 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
516 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
517 writeVariableLengthInt(buf, variableHeaderBufferSize);
518 buf.writeShort(msgId);
519 if (includeReasonCode) {
520 buf.writeByte(variableHeader.reasonCode());
521 }
522 buf.writeBytes(propertiesBuf);
523
524 return buf;
525 } finally {
526 propertiesBuf.release();
527 }
528 } else {
529 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
530 }
531 }
532
533 private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
534 ByteBufAllocator byteBufAllocator,
535 MqttMessage message) {
536 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
537 MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
538 int msgId = variableHeader.messageId();
539
540 int variableHeaderBufferSize = 2;
541 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
542 ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
543 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
544 writeVariableLengthInt(buf, variableHeaderBufferSize);
545 buf.writeShort(msgId);
546
547 return buf;
548 }
549
550 private static ByteBuf encodeReasonCodePlusPropertiesMessage(
551 ChannelHandlerContext ctx,
552 MqttMessage message) {
553 if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
554 MqttVersion mqttVersion = getMqttVersion(ctx);
555 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
556 MqttReasonCodeAndPropertiesVariableHeader variableHeader =
557 (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
558
559 final ByteBuf propertiesBuf;
560 final boolean includeReasonCode;
561 final int variableHeaderBufferSize;
562 if (mqttVersion == MqttVersion.MQTT_5 &&
563 (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
564 !variableHeader.properties().isEmpty())) {
565 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
566 includeReasonCode = true;
567 variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
568 } else {
569 propertiesBuf = Unpooled.EMPTY_BUFFER;
570 includeReasonCode = false;
571 variableHeaderBufferSize = 0;
572 }
573
574 try {
575 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
576 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
577 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
578 writeVariableLengthInt(buf, variableHeaderBufferSize);
579 if (includeReasonCode) {
580 buf.writeByte(variableHeader.reasonCode());
581 }
582 buf.writeBytes(propertiesBuf);
583
584 return buf;
585 } finally {
586 propertiesBuf.release();
587 }
588 } else {
589 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
590 }
591 }
592
593 private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
594 ByteBufAllocator byteBufAllocator,
595 MqttMessage message) {
596 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
597 ByteBuf buf = byteBufAllocator.buffer(2);
598 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
599 buf.writeByte(0);
600
601 return buf;
602 }
603
604 private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
605 ByteBufAllocator byteBufAllocator,
606 MqttProperties mqttProperties) {
607 if (mqttVersion == MqttVersion.MQTT_5) {
608 return encodeProperties(byteBufAllocator, mqttProperties);
609 }
610 return Unpooled.EMPTY_BUFFER;
611 }
612
613 private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
614 MqttProperties mqttProperties) {
615 ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
616
617 try {
618 ByteBuf propertiesBuf = byteBufAllocator.buffer();
619 try {
620 for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
621 int propertyId = property.propertyId;
622 switch (propertyId) {
623 case PAYLOAD_FORMAT_INDICATOR:
624 case REQUEST_PROBLEM_INFORMATION:
625 case REQUEST_RESPONSE_INFORMATION:
626 case MAXIMUM_QOS:
627 case RETAIN_AVAILABLE:
628 case WILDCARD_SUBSCRIPTION_AVAILABLE:
629 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
630 case SHARED_SUBSCRIPTION_AVAILABLE:
631 writeVariableLengthInt(propertiesBuf, propertyId);
632 final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
633 propertiesBuf.writeByte(bytePropValue);
634 break;
635 case SERVER_KEEP_ALIVE:
636 case RECEIVE_MAXIMUM:
637 case TOPIC_ALIAS_MAXIMUM:
638 case TOPIC_ALIAS:
639 writeVariableLengthInt(propertiesBuf, propertyId);
640 final short twoBytesInPropValue =
641 ((MqttProperties.IntegerProperty) property).value.shortValue();
642 propertiesBuf.writeShort(twoBytesInPropValue);
643 break;
644 case PUBLICATION_EXPIRY_INTERVAL:
645 case SESSION_EXPIRY_INTERVAL:
646 case WILL_DELAY_INTERVAL:
647 case MAXIMUM_PACKET_SIZE:
648 writeVariableLengthInt(propertiesBuf, propertyId);
649 final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
650 propertiesBuf.writeInt(fourBytesIntPropValue);
651 break;
652 case SUBSCRIPTION_IDENTIFIER:
653 writeVariableLengthInt(propertiesBuf, propertyId);
654 final int vbi = ((MqttProperties.IntegerProperty) property).value;
655 writeVariableLengthInt(propertiesBuf, vbi);
656 break;
657 case CONTENT_TYPE:
658 case RESPONSE_TOPIC:
659 case ASSIGNED_CLIENT_IDENTIFIER:
660 case AUTHENTICATION_METHOD:
661 case RESPONSE_INFORMATION:
662 case SERVER_REFERENCE:
663 case REASON_STRING:
664 writeVariableLengthInt(propertiesBuf, propertyId);
665 writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
666 break;
667 case USER_PROPERTY:
668 final List<MqttProperties.StringPair> pairs =
669 ((MqttProperties.UserProperties) property).value;
670 for (MqttProperties.StringPair pair : pairs) {
671 writeVariableLengthInt(propertiesBuf, propertyId);
672 writeEagerUTF8String(propertiesBuf, pair.key);
673 writeEagerUTF8String(propertiesBuf, pair.value);
674 }
675 break;
676 case CORRELATION_DATA:
677 case AUTHENTICATION_DATA:
678 writeVariableLengthInt(propertiesBuf, propertyId);
679 final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
680 propertiesBuf.writeShort(binaryPropValue.length);
681 propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
682 break;
683 default:
684
685 throw new EncoderException("Unknown property type: " + propertyId);
686 }
687 }
688 writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
689 propertiesHeaderBuf.writeBytes(propertiesBuf);
690
691 return propertiesHeaderBuf;
692 } finally {
693 propertiesBuf.release();
694 }
695 } catch (RuntimeException e) {
696 propertiesHeaderBuf.release();
697 throw e;
698 }
699 }
700
701 private static int getFixedHeaderByte1(MqttFixedHeader header) {
702 int ret = 0;
703 ret |= header.messageType().value() << 4;
704 if (header.isDup()) {
705 ret |= 0x08;
706 }
707 ret |= header.qosLevel().value() << 1;
708 if (header.isRetain()) {
709 ret |= 0x01;
710 }
711 return ret;
712 }
713
714 private static void writeVariableLengthInt(ByteBuf buf, int num) {
715 do {
716 int digit = num & 0x7F;
717 num >>>= 7;
718 if (num > 0) {
719 digit |= 0x80;
720 }
721 buf.writeByte(digit);
722 } while (num > 0);
723 }
724
725 private static int nullableUtf8Bytes(String s) {
726 return s == null? 0 : utf8Bytes(s);
727 }
728
729 private static int nullableMaxUtf8Bytes(String s) {
730 return s == null? 0 : utf8MaxBytes(s);
731 }
732
733 private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
734 buf.ensureWritable(utf8Length + 2);
735 buf.writeShort(utf8Length);
736 if (utf8Length > 0) {
737 final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
738 assert writtenUtf8Length == utf8Length;
739 }
740 }
741
742 private static void writeEagerUTF8String(ByteBuf buf, String s) {
743 final int maxUtf8Length = nullableMaxUtf8Bytes(s);
744 buf.ensureWritable(maxUtf8Length + 2);
745 final int writerIndex = buf.writerIndex();
746 final int startUtf8String = writerIndex + 2;
747 buf.writerIndex(startUtf8String);
748 final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
749 buf.setShort(writerIndex, utf8Length);
750 }
751
752 private static void writeUnsafeUTF8String(ByteBuf buf, String s) {
753 final int writerIndex = buf.writerIndex();
754 final int startUtf8String = writerIndex + 2;
755
756 buf.writerIndex(startUtf8String);
757 final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, 0) : 0;
758 buf.setShort(writerIndex, utf8Length);
759 }
760
761 private static int getVariableLengthInt(int num) {
762 if (num < 128) {
763 return 1;
764 }
765 if (num < 16_384) {
766 return 2;
767 }
768 if (num < 2_097_152) {
769 return 3;
770 }
771 return 4;
772 }
773
774 }