1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.DecoderException;
21 import io.netty.util.Attribute;
22 import io.netty.util.AttributeKey;
23
24 import static io.netty.handler.codec.mqtt.MqttConstant.MIN_CLIENT_ID_LENGTH;
25
26 final class MqttCodecUtil {
27
28 static final AttributeKey<MqttVersion> MQTT_VERSION_KEY = AttributeKey.valueOf("NETTY_CODEC_MQTT_VERSION");
29
30 static MqttVersion getMqttVersion(ChannelHandlerContext ctx) {
31 Attribute<MqttVersion> attr = ctx.channel().attr(MQTT_VERSION_KEY);
32 MqttVersion version = attr.get();
33 if (version == null) {
34 return MqttVersion.MQTT_3_1_1;
35 }
36 return version;
37 }
38
39 static void setMqttVersion(ChannelHandlerContext ctx, MqttVersion version) {
40 Attribute<MqttVersion> attr = ctx.channel().attr(MQTT_VERSION_KEY);
41 attr.set(version);
42 }
43
44 static boolean isValidPublishTopicName(String topicName) {
45
46 for (int i = 0; i < topicName.length(); i++) {
47 char c = topicName.charAt(i);
48 if (c == '#' || c == '+') {
49 return false;
50 }
51 }
52 return true;
53 }
54
55 static boolean isValidMessageId(int messageId) {
56 return messageId != 0;
57 }
58
59 static boolean isValidClientId(MqttVersion mqttVersion, int maxClientIdLength, String clientId) {
60 if (mqttVersion == MqttVersion.MQTT_3_1) {
61 return clientId != null && clientId.length() >= MIN_CLIENT_ID_LENGTH &&
62 clientId.length() <= maxClientIdLength;
63 }
64 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_5) {
65
66
67 return clientId != null;
68 }
69 throw new IllegalArgumentException(mqttVersion + " is unknown mqtt version");
70 }
71
72 static MqttFixedHeader validateFixedHeader(ChannelHandlerContext ctx, MqttFixedHeader mqttFixedHeader) {
73 switch (mqttFixedHeader.messageType()) {
74 case PUBREL:
75 case SUBSCRIBE:
76 case UNSUBSCRIBE:
77 if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {
78 throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");
79 }
80 return mqttFixedHeader;
81 case AUTH:
82 if (MqttCodecUtil.getMqttVersion(ctx) != MqttVersion.MQTT_5) {
83 throw new DecoderException("AUTH message requires at least MQTT 5");
84 }
85 return mqttFixedHeader;
86 default:
87 return mqttFixedHeader;
88 }
89 }
90
91 static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
92 switch (mqttFixedHeader.messageType()) {
93 case CONNECT:
94 case CONNACK:
95 case PUBACK:
96 case PUBREC:
97 case PUBCOMP:
98 case SUBACK:
99 case UNSUBACK:
100 case PINGREQ:
101 case PINGRESP:
102 case DISCONNECT:
103 if (mqttFixedHeader.isDup() ||
104 mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE ||
105 mqttFixedHeader.isRetain()) {
106 return new MqttFixedHeader(
107 mqttFixedHeader.messageType(),
108 false,
109 MqttQoS.AT_MOST_ONCE,
110 false,
111 mqttFixedHeader.remainingLength());
112 }
113 return mqttFixedHeader;
114 case PUBREL:
115 case SUBSCRIBE:
116 case UNSUBSCRIBE:
117 if (mqttFixedHeader.isRetain()) {
118 return new MqttFixedHeader(
119 mqttFixedHeader.messageType(),
120 mqttFixedHeader.isDup(),
121 mqttFixedHeader.qosLevel(),
122 false,
123 mqttFixedHeader.remainingLength());
124 }
125 return mqttFixedHeader;
126 default:
127 return mqttFixedHeader;
128 }
129 }
130
131 private MqttCodecUtil() { }
132 }