View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.channel.ChannelHandler;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.handler.codec.DecoderException;
24  import io.netty.handler.codec.MessageToMessageEncoder;
25  import io.netty.util.CharsetUtil;
26  import io.netty.util.internal.EmptyArrays;
27  
28  import java.util.List;
29  
30  import static io.netty.handler.codec.mqtt.MqttCodecUtil.*;
31  
32  /**
33   * Encodes Mqtt messages into bytes following the protocol specification v3.1
34   * as described here <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
35   */
36  @ChannelHandler.Sharable
37  public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
38  
39      public static final MqttEncoder INSTANCE = new MqttEncoder();
40  
41      private MqttEncoder() { }
42  
43      @Override
44      protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
45          out.add(doEncode(ctx.alloc(), msg));
46      }
47  
48      /**
49       * This is the main encoding method.
50       * It's only visible for testing.
51       *
52       * @param byteBufAllocator Allocates ByteBuf
53       * @param message MQTT message to encode
54       * @return ByteBuf with encoded bytes
55       */
56      static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {
57  
58          switch (message.fixedHeader().messageType()) {
59              case CONNECT:
60                  return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);
61  
62              case CONNACK:
63                  return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);
64  
65              case PUBLISH:
66                  return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);
67  
68              case SUBSCRIBE:
69                  return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);
70  
71              case UNSUBSCRIBE:
72                  return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);
73  
74              case SUBACK:
75                  return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);
76  
77              case UNSUBACK:
78              case PUBACK:
79              case PUBREC:
80              case PUBREL:
81              case PUBCOMP:
82                  return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);
83  
84              case PINGREQ:
85              case PINGRESP:
86              case DISCONNECT:
87                  return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);
88  
89              default:
90                  throw new IllegalArgumentException(
91                          "Unknown message type: " + message.fixedHeader().messageType().value());
92          }
93      }
94  
95      private static ByteBuf encodeConnectMessage(
96              ByteBufAllocator byteBufAllocator,
97              MqttConnectMessage message) {
98          int payloadBufferSize = 0;
99  
100         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
101         MqttConnectVariableHeader variableHeader = message.variableHeader();
102         MqttConnectPayload payload = message.payload();
103         MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
104                 (byte) variableHeader.version());
105 
106         // as MQTT 3.1 & 3.1.1 spec, If the User Name Flag is set to 0, the Password Flag MUST be set to 0
107         if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
108             throw new DecoderException("Without a username, the password MUST be not set");
109         }
110 
111         // Client id
112         String clientIdentifier = payload.clientIdentifier();
113         if (!isValidClientId(mqttVersion, clientIdentifier)) {
114             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
115         }
116         byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);
117         payloadBufferSize += 2 + clientIdentifierBytes.length;
118 
119         // Will topic and message
120         String willTopic = payload.willTopic();
121         byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EmptyArrays.EMPTY_BYTES;
122         byte[] willMessage = payload.willMessageInBytes();
123         byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
124         if (variableHeader.isWillFlag()) {
125             payloadBufferSize += 2 + willTopicBytes.length;
126             payloadBufferSize += 2 + willMessageBytes.length;
127         }
128 
129         String userName = payload.userName();
130         byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EmptyArrays.EMPTY_BYTES;
131         if (variableHeader.hasUserName()) {
132             payloadBufferSize += 2 + userNameBytes.length;
133         }
134 
135         byte[] password = payload.passwordInBytes();
136         byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
137         if (variableHeader.hasPassword()) {
138             payloadBufferSize += 2 + passwordBytes.length;
139         }
140 
141         // Fixed header
142         byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
143         int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;
144         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
145         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
146         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
147         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
148         writeVariableLengthInt(buf, variablePartSize);
149 
150         buf.writeShort(protocolNameBytes.length);
151         buf.writeBytes(protocolNameBytes);
152 
153         buf.writeByte(variableHeader.version());
154         buf.writeByte(getConnVariableHeaderFlag(variableHeader));
155         buf.writeShort(variableHeader.keepAliveTimeSeconds());
156 
157         // Payload
158         buf.writeShort(clientIdentifierBytes.length);
159         buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
160         if (variableHeader.isWillFlag()) {
161             buf.writeShort(willTopicBytes.length);
162             buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
163             buf.writeShort(willMessageBytes.length);
164             buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
165         }
166         if (variableHeader.hasUserName()) {
167             buf.writeShort(userNameBytes.length);
168             buf.writeBytes(userNameBytes, 0, userNameBytes.length);
169         }
170         if (variableHeader.hasPassword()) {
171             buf.writeShort(passwordBytes.length);
172             buf.writeBytes(passwordBytes, 0, passwordBytes.length);
173         }
174         return buf;
175     }
176 
177     private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
178         int flagByte = 0;
179         if (variableHeader.hasUserName()) {
180             flagByte |= 0x80;
181         }
182         if (variableHeader.hasPassword()) {
183             flagByte |= 0x40;
184         }
185         if (variableHeader.isWillRetain()) {
186             flagByte |= 0x20;
187         }
188         flagByte |= (variableHeader.willQos() & 0x03) << 3;
189         if (variableHeader.isWillFlag()) {
190             flagByte |= 0x04;
191         }
192         if (variableHeader.isCleanSession()) {
193             flagByte |= 0x02;
194         }
195         return flagByte;
196     }
197 
198     private static ByteBuf encodeConnAckMessage(
199             ByteBufAllocator byteBufAllocator,
200             MqttConnAckMessage message) {
201         ByteBuf buf = byteBufAllocator.buffer(4);
202         buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
203         buf.writeByte(2);
204         buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
205         buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
206 
207         return buf;
208     }
209 
210     private static ByteBuf encodeSubscribeMessage(
211             ByteBufAllocator byteBufAllocator,
212             MqttSubscribeMessage message) {
213         int variableHeaderBufferSize = 2;
214         int payloadBufferSize = 0;
215 
216         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
217         MqttMessageIdVariableHeader variableHeader = message.variableHeader();
218         MqttSubscribePayload payload = message.payload();
219 
220         for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
221             String topicName = topic.topicName();
222             byte[] topicNameBytes = encodeStringUtf8(topicName);
223             payloadBufferSize += 2 + topicNameBytes.length;
224             payloadBufferSize += 1;
225         }
226 
227         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
228         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
229 
230         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
231         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
232         writeVariableLengthInt(buf, variablePartSize);
233 
234         // Variable Header
235         int messageId = variableHeader.messageId();
236         buf.writeShort(messageId);
237 
238         // Payload
239         for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
240             String topicName = topic.topicName();
241             byte[] topicNameBytes = encodeStringUtf8(topicName);
242             buf.writeShort(topicNameBytes.length);
243             buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
244             buf.writeByte(topic.qualityOfService().value());
245         }
246 
247         return buf;
248     }
249 
250     private static ByteBuf encodeUnsubscribeMessage(
251             ByteBufAllocator byteBufAllocator,
252             MqttUnsubscribeMessage message) {
253         int variableHeaderBufferSize = 2;
254         int payloadBufferSize = 0;
255 
256         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
257         MqttMessageIdVariableHeader variableHeader = message.variableHeader();
258         MqttUnsubscribePayload payload = message.payload();
259 
260         for (String topicName : payload.topics()) {
261             byte[] topicNameBytes = encodeStringUtf8(topicName);
262             payloadBufferSize += 2 + topicNameBytes.length;
263         }
264 
265         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
266         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
267 
268         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
269         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
270         writeVariableLengthInt(buf, variablePartSize);
271 
272         // Variable Header
273         int messageId = variableHeader.messageId();
274         buf.writeShort(messageId);
275 
276         // Payload
277         for (String topicName : payload.topics()) {
278             byte[] topicNameBytes = encodeStringUtf8(topicName);
279             buf.writeShort(topicNameBytes.length);
280             buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
281         }
282 
283         return buf;
284     }
285 
286     private static ByteBuf encodeSubAckMessage(
287             ByteBufAllocator byteBufAllocator,
288             MqttSubAckMessage message) {
289         int variableHeaderBufferSize = 2;
290         int payloadBufferSize = message.payload().grantedQoSLevels().size();
291         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
292         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
293         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
294         buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
295         writeVariableLengthInt(buf, variablePartSize);
296         buf.writeShort(message.variableHeader().messageId());
297         for (int qos : message.payload().grantedQoSLevels()) {
298             buf.writeByte(qos);
299         }
300 
301         return buf;
302     }
303 
304     private static ByteBuf encodePublishMessage(
305             ByteBufAllocator byteBufAllocator,
306             MqttPublishMessage message) {
307         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
308         MqttPublishVariableHeader variableHeader = message.variableHeader();
309         ByteBuf payload = message.payload().duplicate();
310 
311         String topicName = variableHeader.topicName();
312         byte[] topicNameBytes = encodeStringUtf8(topicName);
313 
314         int variableHeaderBufferSize = 2 + topicNameBytes.length +
315                 (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);
316         int payloadBufferSize = payload.readableBytes();
317         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
318         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
319 
320         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
321         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
322         writeVariableLengthInt(buf, variablePartSize);
323         buf.writeShort(topicNameBytes.length);
324         buf.writeBytes(topicNameBytes);
325         if (mqttFixedHeader.qosLevel().value() > 0) {
326             buf.writeShort(variableHeader.messageId());
327         }
328         buf.writeBytes(payload);
329 
330         return buf;
331     }
332 
333     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
334             ByteBufAllocator byteBufAllocator,
335             MqttMessage message) {
336         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
337         MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
338         int msgId = variableHeader.messageId();
339 
340         int variableHeaderBufferSize = 2; // variable part only has a message id
341         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
342         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
343         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
344         writeVariableLengthInt(buf, variableHeaderBufferSize);
345         buf.writeShort(msgId);
346 
347         return buf;
348     }
349 
350     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
351             ByteBufAllocator byteBufAllocator,
352             MqttMessage message) {
353         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
354         ByteBuf buf = byteBufAllocator.buffer(2);
355         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
356         buf.writeByte(0);
357 
358         return buf;
359     }
360 
361     private static int getFixedHeaderByte1(MqttFixedHeader header) {
362         int ret = 0;
363         ret |= header.messageType().value() << 4;
364         if (header.isDup()) {
365             ret |= 0x08;
366         }
367         ret |= header.qosLevel().value() << 1;
368         if (header.isRetain()) {
369             ret |= 0x01;
370         }
371         return ret;
372     }
373 
374     private static void writeVariableLengthInt(ByteBuf buf, int num) {
375         do {
376             int digit = num % 128;
377             num /= 128;
378             if (num > 0) {
379                 digit |= 0x80;
380             }
381             buf.writeByte(digit);
382         } while (num > 0);
383     }
384 
385     private static int getVariableLengthInt(int num) {
386         int count = 0;
387         do {
388             num /= 128;
389             count++;
390         } while (num > 0);
391         return count;
392     }
393 
394     private static byte[] encodeStringUtf8(String s) {
395       return s.getBytes(CharsetUtil.UTF_8);
396     }
397 }