View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.mqtt;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.handler.codec.MessageToMessageEncoder;
23  import io.netty.util.CharsetUtil;
24  
25  import java.util.List;
26  
27  import static io.netty.handler.codec.mqtt.MqttCodecUtil.*;
28  
29  /**
30   * Encodes Mqtt messages into bytes following the protocl specification v3.1
31   * as described here <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
32   */
33  public class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
34  
35      public static final MqttEncoder DEFAUL_ENCODER = new MqttEncoder();
36  
37      private static final byte[] EMPTY = new byte[0];
38  
39      @Override
40      protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
41          out.add(doEncode(ctx.alloc(), msg));
42      }
43  
44      /**
45       * This is the main encoding method.
46       * It's only visible for testing.
47       *
48       * @param byteBufAllocator Allocates ByteBuf
49       * @param message MQTT message to encode
50       * @return ByteBuf with encoded bytes
51       */
52      static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {
53  
54          switch (message.fixedHeader().messageType()) {
55              case CONNECT:
56                  return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);
57  
58              case CONNACK:
59                  return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);
60  
61              case PUBLISH:
62                  return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);
63  
64              case SUBSCRIBE:
65                  return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);
66  
67              case UNSUBSCRIBE:
68                  return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);
69  
70              case SUBACK:
71                  return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);
72  
73              case UNSUBACK:
74              case PUBACK:
75              case PUBREC:
76              case PUBREL:
77              case PUBCOMP:
78                  return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);
79  
80              case PINGREQ:
81              case PINGRESP:
82              case DISCONNECT:
83                  return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);
84  
85              default:
86                  throw new IllegalArgumentException(
87                          "Unknown message type: " + message.fixedHeader().messageType().value());
88          }
89      }
90  
91      private static ByteBuf encodeConnectMessage(
92              ByteBufAllocator byteBufAllocator,
93              MqttConnectMessage message) {
94          int payloadBufferSize = 0;
95  
96          MqttFixedHeader mqttFixedHeader = message.fixedHeader();
97          MqttConnectVariableHeader variableHeader = message.variableHeader();
98          MqttConnectPayload payload = message.payload();
99          MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
100                 (byte) variableHeader.version());
101 
102         // Client id
103         String clientIdentifier = payload.clientIdentifier();
104         if (!isValidClientId(mqttVersion, clientIdentifier)) {
105             throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
106         }
107         byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);
108         payloadBufferSize += 2 + clientIdentifierBytes.length;
109 
110         // Will topic and message
111         String willTopic = payload.willTopic();
112         byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EMPTY;
113         String willMessage = payload.willMessage();
114         byte[] willMessageBytes = willMessage != null ? encodeStringUtf8(willMessage) : EMPTY;
115         if (variableHeader.isWillFlag()) {
116             payloadBufferSize += 2 + willTopicBytes.length;
117             payloadBufferSize += 2 + willMessageBytes.length;
118         }
119 
120         String userName = payload.userName();
121         byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EMPTY;
122         if (variableHeader.hasUserName()) {
123             payloadBufferSize += 2 + userNameBytes.length;
124         }
125 
126         String password = payload.password();
127         byte[] passwordBytes = password != null ? encodeStringUtf8(password) : EMPTY;
128         if (variableHeader.hasPassword()) {
129             payloadBufferSize += 2 + passwordBytes.length;
130         }
131 
132         // Fixed header
133         byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
134         int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;
135         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
136         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
137         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
138         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
139         writeVariableLengthInt(buf, variablePartSize);
140 
141         buf.writeShort(protocolNameBytes.length);
142         buf.writeBytes(protocolNameBytes);
143 
144         buf.writeByte(variableHeader.version());
145         buf.writeByte(getConnVariableHeaderFlag(variableHeader));
146         buf.writeShort(variableHeader.keepAliveTimeSeconds());
147 
148         // Payload
149         buf.writeShort(clientIdentifierBytes.length);
150         buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
151         if (variableHeader.isWillFlag()) {
152             buf.writeShort(willTopicBytes.length);
153             buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
154             buf.writeShort(willMessageBytes.length);
155             buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
156         }
157         if (variableHeader.hasUserName()) {
158             buf.writeShort(userNameBytes.length);
159             buf.writeBytes(userNameBytes, 0, userNameBytes.length);
160         }
161         if (variableHeader.hasPassword()) {
162             buf.writeShort(passwordBytes.length);
163             buf.writeBytes(passwordBytes, 0, passwordBytes.length);
164         }
165         return buf;
166     }
167 
168     private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
169         int flagByte = 0;
170         if (variableHeader.hasUserName()) {
171             flagByte |= 0x80;
172         }
173         if (variableHeader.hasPassword()) {
174             flagByte |= 0x40;
175         }
176         if (variableHeader.isWillRetain()) {
177             flagByte |= 0x20;
178         }
179         flagByte |= (variableHeader.willQos() & 0x03) << 3;
180         if (variableHeader.isWillFlag()) {
181             flagByte |= 0x04;
182         }
183         if (variableHeader.isCleanSession()) {
184             flagByte |= 0x02;
185         }
186         return flagByte;
187     }
188 
189     private static ByteBuf encodeConnAckMessage(
190             ByteBufAllocator byteBufAllocator,
191             MqttConnAckMessage message) {
192         ByteBuf buf = byteBufAllocator.buffer(4);
193         buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
194         buf.writeByte(2);
195         buf.writeByte(0);
196         buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
197 
198         return buf;
199     }
200 
201     private static ByteBuf encodeSubscribeMessage(
202             ByteBufAllocator byteBufAllocator,
203             MqttSubscribeMessage message) {
204         int variableHeaderBufferSize = 2;
205         int payloadBufferSize = 0;
206 
207         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
208         MqttMessageIdVariableHeader variableHeader = message.variableHeader();
209         MqttSubscribePayload payload = message.payload();
210 
211         for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
212             String topicName = topic.topicName();
213             byte[] topicNameBytes = encodeStringUtf8(topicName);
214             payloadBufferSize += 2 + topicNameBytes.length;
215             payloadBufferSize += 1;
216         }
217 
218         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
219         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
220 
221         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
222         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
223         writeVariableLengthInt(buf, variablePartSize);
224 
225         // Variable Header
226         int messageId = variableHeader.messageId();
227         buf.writeShort(messageId);
228 
229         // Payload
230         for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
231             String topicName = topic.topicName();
232             byte[] topicNameBytes = encodeStringUtf8(topicName);
233             buf.writeShort(topicNameBytes.length);
234             buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
235             buf.writeByte(topic.qualityOfService().value());
236         }
237 
238         return buf;
239     }
240 
241     private static ByteBuf encodeUnsubscribeMessage(
242             ByteBufAllocator byteBufAllocator,
243             MqttUnsubscribeMessage message) {
244         int variableHeaderBufferSize = 2;
245         int payloadBufferSize = 0;
246 
247         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
248         MqttMessageIdVariableHeader variableHeader = message.variableHeader();
249         MqttUnsubscribePayload payload = message.payload();
250 
251         for (String topicName : payload.topics()) {
252             byte[] topicNameBytes = encodeStringUtf8(topicName);
253             payloadBufferSize += 2 + topicNameBytes.length;
254         }
255 
256         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
257         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
258 
259         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
260         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
261         writeVariableLengthInt(buf, variablePartSize);
262 
263         // Variable Header
264         int messageId = variableHeader.messageId();
265         buf.writeShort(messageId);
266 
267         // Payload
268         for (String topicName : payload.topics()) {
269             byte[] topicNameBytes = encodeStringUtf8(topicName);
270             buf.writeShort(topicNameBytes.length);
271             buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
272         }
273 
274         return buf;
275     }
276 
277     private static ByteBuf encodeSubAckMessage(
278             ByteBufAllocator byteBufAllocator,
279             MqttSubAckMessage message) {
280         int variableHeaderBufferSize = 2;
281         int payloadBufferSize = message.payload().grantedQoSLevels().size();
282         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
283         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
284         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
285         buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
286         writeVariableLengthInt(buf, variablePartSize);
287         buf.writeShort(message.variableHeader().messageId());
288         for (int qos : message.payload().grantedQoSLevels()) {
289             buf.writeByte(qos);
290         }
291 
292         return buf;
293     }
294 
295     private static ByteBuf encodePublishMessage(
296             ByteBufAllocator byteBufAllocator,
297             MqttPublishMessage message) {
298         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
299         MqttPublishVariableHeader variableHeader = message.variableHeader();
300         ByteBuf payload = message.payload().duplicate();
301 
302         String topicName = variableHeader.topicName();
303         byte[] topicNameBytes = encodeStringUtf8(topicName);
304 
305         int variableHeaderBufferSize = 2 + topicNameBytes.length +
306                 (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);
307         int payloadBufferSize = payload.readableBytes();
308         int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
309         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
310 
311         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
312         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
313         writeVariableLengthInt(buf, variablePartSize);
314         buf.writeShort(topicNameBytes.length);
315         buf.writeBytes(topicNameBytes);
316         if (mqttFixedHeader.qosLevel().value() > 0) {
317             buf.writeShort(variableHeader.messageId());
318         }
319         buf.writeBytes(payload);
320 
321         return buf;
322     }
323 
324     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
325             ByteBufAllocator byteBufAllocator,
326             MqttMessage message) {
327         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
328         MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
329         int msgId = variableHeader.messageId();
330 
331         int variableHeaderBufferSize = 2; // variable part only has a message id
332         int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
333         ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
334         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
335         writeVariableLengthInt(buf, variableHeaderBufferSize);
336         buf.writeShort(msgId);
337 
338         return buf;
339     }
340 
341     private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
342             ByteBufAllocator byteBufAllocator,
343             MqttMessage message) {
344         MqttFixedHeader mqttFixedHeader = message.fixedHeader();
345         ByteBuf buf = byteBufAllocator.buffer(2);
346         buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
347         buf.writeByte(0);
348 
349         return buf;
350     }
351 
352     private static int getFixedHeaderByte1(MqttFixedHeader header) {
353         int ret = 0;
354         ret |= header.messageType().value() << 4;
355         if (header.isDup()) {
356             ret |= 0x08;
357         }
358         ret |= header.qosLevel().value() << 1;
359         if (header.isRetain()) {
360             ret |= 0x01;
361         }
362         return ret;
363     }
364 
365     private static void writeVariableLengthInt(ByteBuf buf, int num) {
366         do {
367             int digit = num % 128;
368             num /= 128;
369             if (num > 0) {
370                 digit |= 0x80;
371             }
372             buf.writeByte(digit);
373         } while (num > 0);
374     }
375 
376     private static int getVariableLengthInt(int num) {
377         int count = 0;
378         do {
379             num /= 128;
380             count++;
381         } while (num > 0);
382         return count;
383     }
384 
385     private static byte[] encodeStringUtf8(String s) {
386       return s.getBytes(CharsetUtil.UTF_8);
387     }
388 }