1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.handler.codec.EncoderException;
25 import io.netty.handler.codec.MessageToMessageEncoder;
26 import io.netty.util.internal.EmptyArrays;
27
28 import java.util.List;
29
30 import static io.netty.buffer.ByteBufUtil.*;
31 import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
32 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
34 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
35
36
37
38
39
40
41
42 @ChannelHandler.Sharable
43 public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
44
45 public static final MqttEncoder INSTANCE = new MqttEncoder();
46
47 private MqttEncoder() { }
48
49 @Override
50 protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
51 out.add(doEncode(ctx, msg));
52 }
53
54
55
56
57
58
59
60
61 static ByteBuf doEncode(ChannelHandlerContext ctx,
62 MqttMessage message) {
63
64 switch (message.fixedHeader().messageType()) {
65 case CONNECT:
66 return encodeConnectMessage(ctx, (MqttConnectMessage) message);
67
68 case CONNACK:
69 return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
70
71 case PUBLISH:
72 return encodePublishMessage(ctx, (MqttPublishMessage) message);
73
74 case SUBSCRIBE:
75 return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
76
77 case UNSUBSCRIBE:
78 return encodeUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message);
79
80 case SUBACK:
81 return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
82
83 case UNSUBACK:
84 if (message instanceof MqttUnsubAckMessage) {
85 return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
86 }
87 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
88
89 case PUBACK:
90 case PUBREC:
91 case PUBREL:
92 case PUBCOMP:
93 return encodePubReplyMessage(ctx, message);
94
95 case DISCONNECT:
96 case AUTH:
97 return encodeReasonCodePlusPropertiesMessage(ctx, message);
98
99 case PINGREQ:
100 case PINGRESP:
101 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
102
103 default:
104 throw new IllegalArgumentException(
105 "Unknown message type: " + message.fixedHeader().messageType().value());
106 }
107 }
108
109 private static ByteBuf encodeConnectMessage(
110 ChannelHandlerContext ctx,
111 MqttConnectMessage message) {
112 int payloadBufferSize = 0;
113
114 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
115 MqttConnectVariableHeader variableHeader = message.variableHeader();
116 MqttConnectPayload payload = message.payload();
117 MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
118 (byte) variableHeader.version());
119 setMqttVersion(ctx, mqttVersion);
120
121
122 if ((mqttVersion == MqttVersion.MQTT_3_1 || mqttVersion == MqttVersion.MQTT_3_1_1) &&
123 !variableHeader.hasUserName() && variableHeader.hasPassword()) {
124 throw new EncoderException("Without a username, the password MUST be not set");
125 }
126
127
128 String clientIdentifier = payload.clientIdentifier();
129 if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
130 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
131 }
132 int clientIdentifierBytes = utf8Bytes(clientIdentifier);
133 payloadBufferSize += 2 + clientIdentifierBytes;
134
135
136 String willTopic = payload.willTopic();
137 int willTopicBytes = nullableUtf8Bytes(willTopic);
138 byte[] willMessage = payload.willMessageInBytes();
139 byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
140 if (variableHeader.isWillFlag()) {
141 payloadBufferSize += 2 + willTopicBytes;
142 payloadBufferSize += 2 + willMessageBytes.length;
143 }
144
145 String userName = payload.userName();
146 int userNameBytes = nullableUtf8Bytes(userName);
147 if (variableHeader.hasUserName()) {
148 payloadBufferSize += 2 + userNameBytes;
149 }
150
151 byte[] password = payload.passwordInBytes();
152 byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
153 if (variableHeader.hasPassword()) {
154 payloadBufferSize += 2 + passwordBytes.length;
155 }
156
157
158 byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
159 ByteBuf propertiesBuf = encodePropertiesIfNeeded(
160 mqttVersion,
161 ctx.alloc(),
162 message.variableHeader().properties());
163 try {
164 final ByteBuf willPropertiesBuf;
165 if (variableHeader.isWillFlag()) {
166 willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
167 payloadBufferSize += willPropertiesBuf.readableBytes();
168 } else {
169 willPropertiesBuf = Unpooled.EMPTY_BUFFER;
170 }
171 try {
172 int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
173
174 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
175 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
176 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
177 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
178 writeVariableLengthInt(buf, variablePartSize);
179
180 buf.writeShort(protocolNameBytes.length);
181 buf.writeBytes(protocolNameBytes);
182
183 buf.writeByte(variableHeader.version());
184 buf.writeByte(getConnVariableHeaderFlag(variableHeader));
185 buf.writeShort(variableHeader.keepAliveTimeSeconds());
186 buf.writeBytes(propertiesBuf);
187
188
189 writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
190 if (variableHeader.isWillFlag()) {
191 buf.writeBytes(willPropertiesBuf);
192 writeExactUTF8String(buf, willTopic, willTopicBytes);
193 buf.writeShort(willMessageBytes.length);
194 buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
195 }
196 if (variableHeader.hasUserName()) {
197 writeExactUTF8String(buf, userName, userNameBytes);
198 }
199 if (variableHeader.hasPassword()) {
200 buf.writeShort(passwordBytes.length);
201 buf.writeBytes(passwordBytes, 0, passwordBytes.length);
202 }
203 return buf;
204 } finally {
205 willPropertiesBuf.release();
206 }
207 } finally {
208 propertiesBuf.release();
209 }
210 }
211
212 private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
213 int flagByte = 0;
214 if (variableHeader.hasUserName()) {
215 flagByte |= 0x80;
216 }
217 if (variableHeader.hasPassword()) {
218 flagByte |= 0x40;
219 }
220 if (variableHeader.isWillRetain()) {
221 flagByte |= 0x20;
222 }
223 flagByte |= (variableHeader.willQos() & 0x03) << 3;
224 if (variableHeader.isWillFlag()) {
225 flagByte |= 0x04;
226 }
227 if (variableHeader.isCleanSession()) {
228 flagByte |= 0x02;
229 }
230 return flagByte;
231 }
232
233 private static ByteBuf encodeConnAckMessage(
234 ChannelHandlerContext ctx,
235 MqttConnAckMessage message) {
236 final MqttVersion mqttVersion = getMqttVersion(ctx);
237 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
238 ctx.alloc(),
239 message.variableHeader().properties());
240
241 try {
242 ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
243 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
244 writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
245 buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
246 buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
247 buf.writeBytes(propertiesBuf);
248 return buf;
249 } finally {
250 propertiesBuf.release();
251 }
252 }
253
254 private static ByteBuf encodeSubscribeMessage(
255 ChannelHandlerContext ctx,
256 MqttSubscribeMessage message) {
257 MqttVersion mqttVersion = getMqttVersion(ctx);
258 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
259 ctx.alloc(),
260 message.idAndPropertiesVariableHeader().properties());
261
262 try {
263 final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
264 int payloadBufferSize = 0;
265
266 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
267 MqttMessageIdVariableHeader variableHeader = message.variableHeader();
268 MqttSubscribePayload payload = message.payload();
269
270 for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
271 String topicName = topic.topicName();
272 int topicNameBytes = utf8Bytes(topicName);
273 payloadBufferSize += 2 + topicNameBytes;
274 payloadBufferSize += 1;
275 }
276
277 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
278 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
279
280 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
281 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
282 writeVariableLengthInt(buf, variablePartSize);
283
284
285 int messageId = variableHeader.messageId();
286 buf.writeShort(messageId);
287 buf.writeBytes(propertiesBuf);
288
289
290 for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
291 writeEagerUTF8String(buf, topic.topicName());
292 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) {
293 buf.writeByte(topic.qualityOfService().value());
294 } else {
295 final MqttSubscriptionOption option = topic.option();
296
297 int optionEncoded = option.retainHandling().value() << 4;
298 if (option.isRetainAsPublished()) {
299 optionEncoded |= 0x08;
300 }
301 if (option.isNoLocal()) {
302 optionEncoded |= 0x04;
303 }
304 optionEncoded |= option.qos().value();
305
306 buf.writeByte(optionEncoded);
307 }
308 }
309
310 return buf;
311 } finally {
312 propertiesBuf.release();
313 }
314 }
315
316 private static ByteBuf encodeUnsubscribeMessage(
317 ChannelHandlerContext ctx,
318 MqttUnsubscribeMessage message) {
319 MqttVersion mqttVersion = getMqttVersion(ctx);
320 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
321 ctx.alloc(),
322 message.idAndPropertiesVariableHeader().properties());
323
324 try {
325 final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
326 int payloadBufferSize = 0;
327
328 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
329 MqttMessageIdVariableHeader variableHeader = message.variableHeader();
330 MqttUnsubscribePayload payload = message.payload();
331
332 for (String topicName : payload.topics()) {
333 int topicNameBytes = utf8Bytes(topicName);
334 payloadBufferSize += 2 + topicNameBytes;
335 }
336
337 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
338 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
339
340 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
341 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
342 writeVariableLengthInt(buf, variablePartSize);
343
344
345 int messageId = variableHeader.messageId();
346 buf.writeShort(messageId);
347 buf.writeBytes(propertiesBuf);
348
349
350 for (String topicName : payload.topics()) {
351 writeEagerUTF8String(buf, topicName);
352 }
353
354 return buf;
355 } finally {
356 propertiesBuf.release();
357 }
358 }
359
360 private static ByteBuf encodeSubAckMessage(
361 ChannelHandlerContext ctx,
362 MqttSubAckMessage message) {
363 MqttVersion mqttVersion = getMqttVersion(ctx);
364 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
365 ctx.alloc(),
366 message.idAndPropertiesVariableHeader().properties());
367 try {
368 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
369 int payloadBufferSize = message.payload().grantedQoSLevels().size();
370 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
371 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
372 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
373 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
374 writeVariableLengthInt(buf, variablePartSize);
375 buf.writeShort(message.variableHeader().messageId());
376 buf.writeBytes(propertiesBuf);
377 for (int code: message.payload().reasonCodes()) {
378 buf.writeByte(code);
379 }
380
381 return buf;
382 } finally {
383 propertiesBuf.release();
384 }
385 }
386
387 private static ByteBuf encodeUnsubAckMessage(
388 ChannelHandlerContext ctx,
389 MqttUnsubAckMessage message) {
390 if (message.variableHeader() instanceof MqttMessageIdAndPropertiesVariableHeader) {
391 MqttVersion mqttVersion = getMqttVersion(ctx);
392 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
393 ctx.alloc(),
394 message.idAndPropertiesVariableHeader().properties());
395 try {
396 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
397 MqttUnsubAckPayload payload = message.payload();
398 int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
399 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
400 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
401 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
402 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
403 writeVariableLengthInt(buf, variablePartSize);
404 buf.writeShort(message.variableHeader().messageId());
405 buf.writeBytes(propertiesBuf);
406
407 if (payload != null) {
408 for (Short reasonCode : payload.unsubscribeReasonCodes()) {
409 buf.writeByte(reasonCode);
410 }
411 }
412
413 return buf;
414 } finally {
415 propertiesBuf.release();
416 }
417 } else {
418 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
419 }
420 }
421
422 private static ByteBuf encodePublishMessage(
423 ChannelHandlerContext ctx,
424 MqttPublishMessage message) {
425 MqttVersion mqttVersion = getMqttVersion(ctx);
426 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
427 MqttPublishVariableHeader variableHeader = message.variableHeader();
428 ByteBuf payload = message.payload().duplicate();
429
430 String topicName = variableHeader.topicName();
431 int topicNameBytes = utf8Bytes(topicName);
432
433 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
434 ctx.alloc(),
435 message.variableHeader().properties());
436
437 try {
438 int variableHeaderBufferSize = 2 + topicNameBytes +
439 (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0) + propertiesBuf.readableBytes();
440 int payloadBufferSize = payload.readableBytes();
441 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
442 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
443
444 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
445 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
446 writeVariableLengthInt(buf, variablePartSize);
447 writeExactUTF8String(buf, topicName, topicNameBytes);
448 if (mqttFixedHeader.qosLevel().value() > 0) {
449 buf.writeShort(variableHeader.packetId());
450 }
451 buf.writeBytes(propertiesBuf);
452 buf.writeBytes(payload);
453
454 return buf;
455 } finally {
456 propertiesBuf.release();
457 }
458 }
459
460 private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
461 MqttMessage message) {
462 if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
463 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
464 MqttPubReplyMessageVariableHeader variableHeader =
465 (MqttPubReplyMessageVariableHeader) message.variableHeader();
466 int msgId = variableHeader.messageId();
467
468 final ByteBuf propertiesBuf;
469 final boolean includeReasonCode;
470 final int variableHeaderBufferSize;
471 final MqttVersion mqttVersion = getMqttVersion(ctx);
472 if (mqttVersion == MqttVersion.MQTT_5 &&
473 (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
474 !variableHeader.properties().isEmpty())) {
475 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
476 includeReasonCode = true;
477 variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
478 } else {
479 propertiesBuf = Unpooled.EMPTY_BUFFER;
480 includeReasonCode = false;
481 variableHeaderBufferSize = 2;
482 }
483
484 try {
485 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
486 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
487 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
488 writeVariableLengthInt(buf, variableHeaderBufferSize);
489 buf.writeShort(msgId);
490 if (includeReasonCode) {
491 buf.writeByte(variableHeader.reasonCode());
492 }
493 buf.writeBytes(propertiesBuf);
494
495 return buf;
496 } finally {
497 propertiesBuf.release();
498 }
499 } else {
500 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
501 }
502 }
503
504 private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
505 ByteBufAllocator byteBufAllocator,
506 MqttMessage message) {
507 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
508 MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
509 int msgId = variableHeader.messageId();
510
511 int variableHeaderBufferSize = 2;
512 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
513 ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
514 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
515 writeVariableLengthInt(buf, variableHeaderBufferSize);
516 buf.writeShort(msgId);
517
518 return buf;
519 }
520
521 private static ByteBuf encodeReasonCodePlusPropertiesMessage(
522 ChannelHandlerContext ctx,
523 MqttMessage message) {
524 if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
525 MqttVersion mqttVersion = getMqttVersion(ctx);
526 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
527 MqttReasonCodeAndPropertiesVariableHeader variableHeader =
528 (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
529
530 final ByteBuf propertiesBuf;
531 final boolean includeReasonCode;
532 final int variableHeaderBufferSize;
533 if (mqttVersion == MqttVersion.MQTT_5 &&
534 (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
535 !variableHeader.properties().isEmpty())) {
536 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
537 includeReasonCode = true;
538 variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
539 } else {
540 propertiesBuf = Unpooled.EMPTY_BUFFER;
541 includeReasonCode = false;
542 variableHeaderBufferSize = 0;
543 }
544
545 try {
546 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
547 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
548 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
549 writeVariableLengthInt(buf, variableHeaderBufferSize);
550 if (includeReasonCode) {
551 buf.writeByte(variableHeader.reasonCode());
552 }
553 buf.writeBytes(propertiesBuf);
554
555 return buf;
556 } finally {
557 propertiesBuf.release();
558 }
559 } else {
560 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
561 }
562 }
563
564 private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
565 ByteBufAllocator byteBufAllocator,
566 MqttMessage message) {
567 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
568 ByteBuf buf = byteBufAllocator.buffer(2);
569 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
570 buf.writeByte(0);
571
572 return buf;
573 }
574
575 private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
576 ByteBufAllocator byteBufAllocator,
577 MqttProperties mqttProperties) {
578 if (mqttVersion == MqttVersion.MQTT_5) {
579 return encodeProperties(byteBufAllocator, mqttProperties);
580 }
581 return Unpooled.EMPTY_BUFFER;
582 }
583
584 private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
585 MqttProperties mqttProperties) {
586 ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
587
588 try {
589 ByteBuf propertiesBuf = byteBufAllocator.buffer();
590 try {
591 for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
592 MqttProperties.MqttPropertyType propertyType =
593 MqttProperties.MqttPropertyType.valueOf(property.propertyId);
594 switch (propertyType) {
595 case PAYLOAD_FORMAT_INDICATOR:
596 case REQUEST_PROBLEM_INFORMATION:
597 case REQUEST_RESPONSE_INFORMATION:
598 case MAXIMUM_QOS:
599 case RETAIN_AVAILABLE:
600 case WILDCARD_SUBSCRIPTION_AVAILABLE:
601 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
602 case SHARED_SUBSCRIPTION_AVAILABLE:
603 writeVariableLengthInt(propertiesBuf, property.propertyId);
604 final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
605 propertiesBuf.writeByte(bytePropValue);
606 break;
607 case SERVER_KEEP_ALIVE:
608 case RECEIVE_MAXIMUM:
609 case TOPIC_ALIAS_MAXIMUM:
610 case TOPIC_ALIAS:
611 writeVariableLengthInt(propertiesBuf, property.propertyId);
612 final short twoBytesInPropValue =
613 ((MqttProperties.IntegerProperty) property).value.shortValue();
614 propertiesBuf.writeShort(twoBytesInPropValue);
615 break;
616 case PUBLICATION_EXPIRY_INTERVAL:
617 case SESSION_EXPIRY_INTERVAL:
618 case WILL_DELAY_INTERVAL:
619 case MAXIMUM_PACKET_SIZE:
620 writeVariableLengthInt(propertiesBuf, property.propertyId);
621 final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
622 propertiesBuf.writeInt(fourBytesIntPropValue);
623 break;
624 case SUBSCRIPTION_IDENTIFIER:
625 writeVariableLengthInt(propertiesBuf, property.propertyId);
626 final int vbi = ((MqttProperties.IntegerProperty) property).value;
627 writeVariableLengthInt(propertiesBuf, vbi);
628 break;
629 case CONTENT_TYPE:
630 case RESPONSE_TOPIC:
631 case ASSIGNED_CLIENT_IDENTIFIER:
632 case AUTHENTICATION_METHOD:
633 case RESPONSE_INFORMATION:
634 case SERVER_REFERENCE:
635 case REASON_STRING:
636 writeVariableLengthInt(propertiesBuf, property.propertyId);
637 writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
638 break;
639 case USER_PROPERTY:
640 final List<MqttProperties.StringPair> pairs =
641 ((MqttProperties.UserProperties) property).value;
642 for (MqttProperties.StringPair pair : pairs) {
643 writeVariableLengthInt(propertiesBuf, property.propertyId);
644 writeEagerUTF8String(propertiesBuf, pair.key);
645 writeEagerUTF8String(propertiesBuf, pair.value);
646 }
647 break;
648 case CORRELATION_DATA:
649 case AUTHENTICATION_DATA:
650 writeVariableLengthInt(propertiesBuf, property.propertyId);
651 final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
652 propertiesBuf.writeShort(binaryPropValue.length);
653 propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
654 break;
655 default:
656
657 throw new EncoderException("Unknown property type: " + propertyType);
658 }
659 }
660 writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
661 propertiesHeaderBuf.writeBytes(propertiesBuf);
662
663 return propertiesHeaderBuf;
664 } finally {
665 propertiesBuf.release();
666 }
667 } catch (RuntimeException e) {
668 propertiesHeaderBuf.release();
669 throw e;
670 }
671 }
672
673 private static int getFixedHeaderByte1(MqttFixedHeader header) {
674 int ret = 0;
675 ret |= header.messageType().value() << 4;
676 if (header.isDup()) {
677 ret |= 0x08;
678 }
679 ret |= header.qosLevel().value() << 1;
680 if (header.isRetain()) {
681 ret |= 0x01;
682 }
683 return ret;
684 }
685
686 private static void writeVariableLengthInt(ByteBuf buf, int num) {
687 do {
688 int digit = num % 128;
689 num /= 128;
690 if (num > 0) {
691 digit |= 0x80;
692 }
693 buf.writeByte(digit);
694 } while (num > 0);
695 }
696
697 private static int nullableUtf8Bytes(String s) {
698 return s == null? 0 : utf8Bytes(s);
699 }
700
701 private static int nullableMaxUtf8Bytes(String s) {
702 return s == null? 0 : utf8MaxBytes(s);
703 }
704
705 private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
706 buf.ensureWritable(utf8Length + 2);
707 buf.writeShort(utf8Length);
708 if (utf8Length > 0) {
709 final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
710 assert writtenUtf8Length == utf8Length;
711 }
712 }
713
714 private static void writeEagerUTF8String(ByteBuf buf, String s) {
715 final int maxUtf8Length = nullableMaxUtf8Bytes(s);
716 buf.ensureWritable(maxUtf8Length + 2);
717 final int writerIndex = buf.writerIndex();
718 final int startUtf8String = writerIndex + 2;
719 buf.writerIndex(startUtf8String);
720 final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
721 buf.setShort(writerIndex, utf8Length);
722 }
723
724 private static int getVariableLengthInt(int num) {
725 int count = 0;
726 do {
727 num /= 128;
728 count++;
729 } while (num > 0);
730 return count;
731 }
732
733 }