1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.mqtt;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.util.CharsetUtil;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.List;
27
28 public final class MqttMessageBuilders {
29
30 public static final class PublishBuilder {
31 private String topic;
32 private boolean retained;
33 private MqttQoS qos;
34 private ByteBuf payload;
35 private int messageId;
36 private MqttProperties mqttProperties;
37
38 PublishBuilder() {
39 }
40
41 public PublishBuilder topicName(String topic) {
42 this.topic = topic;
43 return this;
44 }
45
46 public PublishBuilder retained(boolean retained) {
47 this.retained = retained;
48 return this;
49 }
50
51 public PublishBuilder qos(MqttQoS qos) {
52 this.qos = qos;
53 return this;
54 }
55
56 public PublishBuilder payload(ByteBuf payload) {
57 this.payload = payload;
58 return this;
59 }
60
61 public PublishBuilder messageId(int messageId) {
62 this.messageId = messageId;
63 return this;
64 }
65
66 public PublishBuilder properties(MqttProperties properties) {
67 this.mqttProperties = properties;
68 return this;
69 }
70
71 public MqttPublishMessage build() {
72 MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained, 0);
73 MqttPublishVariableHeader mqttVariableHeader =
74 new MqttPublishVariableHeader(topic, messageId, mqttProperties);
75 return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, Unpooled.buffer().writeBytes(payload));
76 }
77 }
78
79 public static final class ConnectBuilder {
80
81 private MqttVersion version = MqttVersion.MQTT_3_1_1;
82 private String clientId;
83 private boolean cleanSession;
84 private boolean hasUser;
85 private boolean hasPassword;
86 private int keepAliveSecs;
87 private MqttProperties willProperties = MqttProperties.NO_PROPERTIES;
88 private boolean willFlag;
89 private boolean willRetain;
90 private MqttQoS willQos = MqttQoS.AT_MOST_ONCE;
91 private String willTopic;
92 private byte[] willMessage;
93 private String username;
94 private byte[] password;
95 private MqttProperties properties = MqttProperties.NO_PROPERTIES;
96
97 ConnectBuilder() {
98 }
99
100 public ConnectBuilder protocolVersion(MqttVersion version) {
101 this.version = version;
102 return this;
103 }
104
105 public ConnectBuilder clientId(String clientId) {
106 this.clientId = clientId;
107 return this;
108 }
109
110 public ConnectBuilder cleanSession(boolean cleanSession) {
111 this.cleanSession = cleanSession;
112 return this;
113 }
114
115 public ConnectBuilder keepAlive(int keepAliveSecs) {
116 this.keepAliveSecs = keepAliveSecs;
117 return this;
118 }
119
120 public ConnectBuilder willFlag(boolean willFlag) {
121 this.willFlag = willFlag;
122 return this;
123 }
124
125 public ConnectBuilder willQoS(MqttQoS willQos) {
126 this.willQos = willQos;
127 return this;
128 }
129
130 public ConnectBuilder willTopic(String willTopic) {
131 this.willTopic = willTopic;
132 return this;
133 }
134
135
136
137
138 @Deprecated
139 public ConnectBuilder willMessage(String willMessage) {
140 willMessage(willMessage == null ? null : willMessage.getBytes(CharsetUtil.UTF_8));
141 return this;
142 }
143
144 public ConnectBuilder willMessage(byte[] willMessage) {
145 this.willMessage = willMessage;
146 return this;
147 }
148
149 public ConnectBuilder willRetain(boolean willRetain) {
150 this.willRetain = willRetain;
151 return this;
152 }
153
154 public ConnectBuilder willProperties(MqttProperties willProperties) {
155 this.willProperties = willProperties;
156 return this;
157 }
158
159 public ConnectBuilder hasUser(boolean value) {
160 this.hasUser = value;
161 return this;
162 }
163
164 public ConnectBuilder hasPassword(boolean value) {
165 this.hasPassword = value;
166 return this;
167 }
168
169 public ConnectBuilder username(String username) {
170 this.hasUser = username != null;
171 this.username = username;
172 return this;
173 }
174
175
176
177
178 @Deprecated
179 public ConnectBuilder password(String password) {
180 password(password == null ? null : password.getBytes(CharsetUtil.UTF_8));
181 return this;
182 }
183
184 public ConnectBuilder password(byte[] password) {
185 this.hasPassword = password != null;
186 this.password = password;
187 return this;
188 }
189
190 public ConnectBuilder properties(MqttProperties properties) {
191 this.properties = properties;
192 return this;
193 }
194
195 public MqttConnectMessage build() {
196 MqttFixedHeader mqttFixedHeader =
197 new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
198 MqttConnectVariableHeader mqttConnectVariableHeader =
199 new MqttConnectVariableHeader(
200 version.protocolName(),
201 version.protocolLevel(),
202 hasUser,
203 hasPassword,
204 willRetain,
205 willQos.value(),
206 willFlag,
207 cleanSession,
208 keepAliveSecs,
209 properties);
210 MqttConnectPayload mqttConnectPayload =
211 new MqttConnectPayload(clientId, willProperties, willTopic, willMessage, username, password);
212 return new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
213 }
214 }
215
216 public static final class SubscribeBuilder {
217
218 private List<MqttTopicSubscription> subscriptions;
219 private int messageId;
220 private MqttProperties properties;
221
222 SubscribeBuilder() {
223 }
224
225 public SubscribeBuilder addSubscription(MqttQoS qos, String topic) {
226 ensureSubscriptionsExist();
227 subscriptions.add(new MqttTopicSubscription(topic, qos));
228 return this;
229 }
230
231 public SubscribeBuilder addSubscription(String topic, MqttSubscriptionOption option) {
232 ensureSubscriptionsExist();
233 subscriptions.add(new MqttTopicSubscription(topic, option));
234 return this;
235 }
236
237 public SubscribeBuilder messageId(int messageId) {
238 this.messageId = messageId;
239 return this;
240 }
241
242 public SubscribeBuilder properties(MqttProperties properties) {
243 this.properties = properties;
244 return this;
245 }
246
247 public MqttSubscribeMessage build() {
248 MqttFixedHeader mqttFixedHeader =
249 new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
250 MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader =
251 new MqttMessageIdAndPropertiesVariableHeader(messageId, properties);
252 MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(subscriptions);
253 return new MqttSubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload);
254 }
255
256 private void ensureSubscriptionsExist() {
257 if (subscriptions == null) {
258 subscriptions = new ArrayList<MqttTopicSubscription>(5);
259 }
260 }
261 }
262
263 public static final class UnsubscribeBuilder {
264
265 private List<String> topicFilters;
266 private int messageId;
267 private MqttProperties properties;
268
269 UnsubscribeBuilder() {
270 }
271
272 public UnsubscribeBuilder addTopicFilter(String topic) {
273 if (topicFilters == null) {
274 topicFilters = new ArrayList<String>(5);
275 }
276 topicFilters.add(topic);
277 return this;
278 }
279
280 public UnsubscribeBuilder messageId(int messageId) {
281 this.messageId = messageId;
282 return this;
283 }
284
285 public UnsubscribeBuilder properties(MqttProperties properties) {
286 this.properties = properties;
287 return this;
288 }
289
290 public MqttUnsubscribeMessage build() {
291 MqttFixedHeader mqttFixedHeader =
292 new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
293 MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader =
294 new MqttMessageIdAndPropertiesVariableHeader(messageId, properties);
295 MqttUnsubscribePayload mqttSubscribePayload = new MqttUnsubscribePayload(topicFilters);
296 return new MqttUnsubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload);
297 }
298 }
299
300 public interface PropertiesInitializer<T> {
301 void apply(T builder);
302 }
303
304 public static final class ConnAckBuilder {
305
306 private MqttConnectReturnCode returnCode;
307 private boolean sessionPresent;
308 private MqttProperties properties = MqttProperties.NO_PROPERTIES;
309 private ConnAckPropertiesBuilder propsBuilder;
310
311 private ConnAckBuilder() {
312 }
313
314 public ConnAckBuilder returnCode(MqttConnectReturnCode returnCode) {
315 this.returnCode = returnCode;
316 return this;
317 }
318
319 public ConnAckBuilder sessionPresent(boolean sessionPresent) {
320 this.sessionPresent = sessionPresent;
321 return this;
322 }
323
324 public ConnAckBuilder properties(MqttProperties properties) {
325 this.properties = properties;
326 return this;
327 }
328
329 public ConnAckBuilder properties(PropertiesInitializer<ConnAckPropertiesBuilder> consumer) {
330 if (propsBuilder == null) {
331 propsBuilder = new ConnAckPropertiesBuilder();
332 }
333 consumer.apply(propsBuilder);
334 return this;
335 }
336
337 public MqttConnAckMessage build() {
338 if (propsBuilder != null) {
339 properties = propsBuilder.build();
340 }
341 MqttFixedHeader mqttFixedHeader =
342 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
343 MqttConnAckVariableHeader mqttConnAckVariableHeader =
344 new MqttConnAckVariableHeader(returnCode, sessionPresent, properties);
345 return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
346 }
347 }
348
349 public static final class ConnAckPropertiesBuilder {
350 private String clientId;
351 private Long sessionExpiryInterval;
352 private int receiveMaximum;
353 private Byte maximumQos;
354 private boolean retain;
355 private Long maximumPacketSize;
356 private int topicAliasMaximum;
357 private String reasonString;
358 private final MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties();
359 private Boolean wildcardSubscriptionAvailable;
360 private Boolean subscriptionIdentifiersAvailable;
361 private Boolean sharedSubscriptionAvailable;
362 private Integer serverKeepAlive;
363 private String responseInformation;
364 private String serverReference;
365 private String authenticationMethod;
366 private byte[] authenticationData;
367
368 public MqttProperties build() {
369 final MqttProperties props = new MqttProperties();
370 if (clientId != null) {
371 props.add(new MqttProperties.StringProperty(MqttProperties.ASSIGNED_CLIENT_IDENTIFIER,
372 clientId));
373 }
374 if (sessionExpiryInterval != null) {
375 props.add(new MqttProperties.IntegerProperty(
376 MqttProperties.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval.intValue()));
377 }
378 if (receiveMaximum > 0) {
379 props.add(new MqttProperties.IntegerProperty(MqttProperties.RECEIVE_MAXIMUM, receiveMaximum));
380 }
381 if (maximumQos != null) {
382 props.add(new MqttProperties.IntegerProperty(MqttProperties.MAXIMUM_QOS,
383 maximumQos.intValue()));
384 }
385 props.add(new MqttProperties.IntegerProperty(MqttProperties.RETAIN_AVAILABLE, retain ? 1 : 0));
386 if (maximumPacketSize != null) {
387 props.add(new MqttProperties.IntegerProperty(MqttProperties.MAXIMUM_PACKET_SIZE,
388 maximumPacketSize.intValue()));
389 }
390 props.add(new MqttProperties.IntegerProperty(MqttProperties.TOPIC_ALIAS_MAXIMUM,
391 topicAliasMaximum));
392 if (reasonString != null) {
393 props.add(new MqttProperties.StringProperty(MqttProperties.REASON_STRING, reasonString));
394 }
395 props.add(userProperties);
396 if (wildcardSubscriptionAvailable != null) {
397 props.add(new MqttProperties.IntegerProperty(MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE,
398 wildcardSubscriptionAvailable ? 1 : 0));
399 }
400 if (subscriptionIdentifiersAvailable != null) {
401 props.add(new MqttProperties.IntegerProperty(MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE,
402 subscriptionIdentifiersAvailable ? 1 : 0));
403 }
404 if (sharedSubscriptionAvailable != null) {
405 props.add(new MqttProperties.IntegerProperty(MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE,
406 sharedSubscriptionAvailable ? 1 : 0));
407 }
408 if (serverKeepAlive != null) {
409 props.add(new MqttProperties.IntegerProperty(MqttProperties.SERVER_KEEP_ALIVE,
410 serverKeepAlive));
411 }
412 if (responseInformation != null) {
413 props.add(new MqttProperties.StringProperty(MqttProperties.RESPONSE_INFORMATION,
414 responseInformation));
415 }
416 if (serverReference != null) {
417 props.add(new MqttProperties.StringProperty(MqttProperties.SERVER_REFERENCE,
418 serverReference));
419 }
420 if (authenticationMethod != null) {
421 props.add(new MqttProperties.StringProperty(MqttProperties.AUTHENTICATION_METHOD,
422 authenticationMethod));
423 }
424 if (authenticationData != null) {
425 props.add(new MqttProperties.BinaryProperty(MqttProperties.AUTHENTICATION_DATA,
426 authenticationData));
427 }
428
429 return props;
430 }
431
432 public ConnAckPropertiesBuilder sessionExpiryInterval(long seconds) {
433 this.sessionExpiryInterval = seconds;
434 return this;
435 }
436
437 public ConnAckPropertiesBuilder receiveMaximum(int value) {
438 this.receiveMaximum = checkPositive(value, "value");
439 return this;
440 }
441
442 public ConnAckPropertiesBuilder maximumQos(byte value) {
443 if (value != 0 && value != 1) {
444 throw new IllegalArgumentException("maximum QoS property could be 0 or 1");
445 }
446 this.maximumQos = value;
447 return this;
448 }
449
450 public ConnAckPropertiesBuilder retainAvailable(boolean retain) {
451 this.retain = retain;
452 return this;
453 }
454
455 public ConnAckPropertiesBuilder maximumPacketSize(long size) {
456 this.maximumPacketSize = checkPositive(size, "size");
457 return this;
458 }
459
460 public ConnAckPropertiesBuilder assignedClientId(String clientId) {
461 this.clientId = clientId;
462 return this;
463 }
464
465 public ConnAckPropertiesBuilder topicAliasMaximum(int value) {
466 this.topicAliasMaximum = value;
467 return this;
468 }
469
470 public ConnAckPropertiesBuilder reasonString(String reason) {
471 this.reasonString = reason;
472 return this;
473 }
474
475 public ConnAckPropertiesBuilder userProperty(String name, String value) {
476 userProperties.add(name, value);
477 return this;
478 }
479
480 public ConnAckPropertiesBuilder wildcardSubscriptionAvailable(boolean value) {
481 this.wildcardSubscriptionAvailable = value;
482 return this;
483 }
484
485 public ConnAckPropertiesBuilder subscriptionIdentifiersAvailable(boolean value) {
486 this.subscriptionIdentifiersAvailable = value;
487 return this;
488 }
489
490 public ConnAckPropertiesBuilder sharedSubscriptionAvailable(boolean value) {
491 this.sharedSubscriptionAvailable = value;
492 return this;
493 }
494
495 public ConnAckPropertiesBuilder serverKeepAlive(int seconds) {
496 this.serverKeepAlive = seconds;
497 return this;
498 }
499
500 public ConnAckPropertiesBuilder responseInformation(String value) {
501 this.responseInformation = value;
502 return this;
503 }
504
505 public ConnAckPropertiesBuilder serverReference(String host) {
506 this.serverReference = host;
507 return this;
508 }
509
510 public ConnAckPropertiesBuilder authenticationMethod(String methodName) {
511 this.authenticationMethod = methodName;
512 return this;
513 }
514
515 public ConnAckPropertiesBuilder authenticationData(byte[] rawData) {
516 this.authenticationData = rawData.clone();
517 return this;
518 }
519 }
520
521 public static final class PubAckBuilder {
522
523 private int packetId;
524 private byte reasonCode;
525 private MqttProperties properties;
526
527 PubAckBuilder() {
528 }
529
530 public PubAckBuilder reasonCode(byte reasonCode) {
531 this.reasonCode = reasonCode;
532 return this;
533 }
534
535 public PubAckBuilder packetId(int packetId) {
536 this.packetId = packetId;
537 return this;
538 }
539
540
541
542
543 @Deprecated
544 public PubAckBuilder packetId(short packetId) {
545 return packetId(packetId & 0xFFFF);
546 }
547
548 public PubAckBuilder properties(MqttProperties properties) {
549 this.properties = properties;
550 return this;
551 }
552
553 public MqttMessage build() {
554 MqttFixedHeader mqttFixedHeader =
555 new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
556 MqttPubReplyMessageVariableHeader mqttPubAckVariableHeader =
557 new MqttPubReplyMessageVariableHeader(packetId, reasonCode, properties);
558 return new MqttMessage(mqttFixedHeader, mqttPubAckVariableHeader);
559 }
560 }
561
562 public static final class SubAckBuilder {
563
564 private int packetId;
565 private MqttProperties properties;
566 private final List<MqttQoS> grantedQoses = new ArrayList<MqttQoS>();
567
568 SubAckBuilder() {
569 }
570
571 public SubAckBuilder packetId(int packetId) {
572 this.packetId = packetId;
573 return this;
574 }
575
576
577
578
579 @Deprecated
580 public SubAckBuilder packetId(short packetId) {
581 return packetId(packetId & 0xFFFF);
582 }
583
584 public SubAckBuilder properties(MqttProperties properties) {
585 this.properties = properties;
586 return this;
587 }
588
589 public SubAckBuilder addGrantedQos(MqttQoS qos) {
590 this.grantedQoses.add(qos);
591 return this;
592 }
593
594 public SubAckBuilder addGrantedQoses(MqttQoS... qoses) {
595 this.grantedQoses.addAll(Arrays.asList(qoses));
596 return this;
597 }
598
599 public MqttSubAckMessage build() {
600 MqttFixedHeader mqttFixedHeader =
601 new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
602 MqttMessageIdAndPropertiesVariableHeader mqttSubAckVariableHeader =
603 new MqttMessageIdAndPropertiesVariableHeader(packetId, properties);
604
605
606 int[] grantedQoses = new int[this.grantedQoses.size()];
607 int i = 0;
608 for (MqttQoS grantedQos : this.grantedQoses) {
609 grantedQoses[i++] = grantedQos.value();
610 }
611
612 MqttSubAckPayload subAckPayload = new MqttSubAckPayload(grantedQoses);
613 return new MqttSubAckMessage(mqttFixedHeader, mqttSubAckVariableHeader, subAckPayload);
614 }
615 }
616
617 public static final class UnsubAckBuilder {
618
619 private int packetId;
620 private MqttProperties properties;
621 private final List<Short> reasonCodes = new ArrayList<Short>();
622
623 UnsubAckBuilder() {
624 }
625
626 public UnsubAckBuilder packetId(int packetId) {
627 this.packetId = packetId;
628 return this;
629 }
630
631
632
633
634 @Deprecated
635 public UnsubAckBuilder packetId(short packetId) {
636 return packetId(packetId & 0xFFFF);
637 }
638
639 public UnsubAckBuilder properties(MqttProperties properties) {
640 this.properties = properties;
641 return this;
642 }
643
644 public UnsubAckBuilder addReasonCode(short reasonCode) {
645 this.reasonCodes.add(reasonCode);
646 return this;
647 }
648
649 public UnsubAckBuilder addReasonCodes(Short... reasonCodes) {
650 this.reasonCodes.addAll(Arrays.asList(reasonCodes));
651 return this;
652 }
653
654 public MqttUnsubAckMessage build() {
655 MqttFixedHeader mqttFixedHeader =
656 new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
657 MqttMessageIdAndPropertiesVariableHeader mqttSubAckVariableHeader =
658 new MqttMessageIdAndPropertiesVariableHeader(packetId, properties);
659
660 MqttUnsubAckPayload subAckPayload = new MqttUnsubAckPayload(reasonCodes);
661 return new MqttUnsubAckMessage(mqttFixedHeader, mqttSubAckVariableHeader, subAckPayload);
662 }
663 }
664
665 public static final class DisconnectBuilder {
666
667 private MqttProperties properties;
668 private byte reasonCode;
669
670 DisconnectBuilder() {
671 }
672
673 public DisconnectBuilder properties(MqttProperties properties) {
674 this.properties = properties;
675 return this;
676 }
677
678 public DisconnectBuilder reasonCode(byte reasonCode) {
679 this.reasonCode = reasonCode;
680 return this;
681 }
682
683 public MqttMessage build() {
684 MqttFixedHeader mqttFixedHeader =
685 new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
686 MqttReasonCodeAndPropertiesVariableHeader mqttDisconnectVariableHeader =
687 new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
688
689 return new MqttMessage(mqttFixedHeader, mqttDisconnectVariableHeader);
690 }
691 }
692
693 public static final class AuthBuilder {
694
695 private MqttProperties properties;
696 private byte reasonCode;
697
698 AuthBuilder() {
699 }
700
701 public AuthBuilder properties(MqttProperties properties) {
702 this.properties = properties;
703 return this;
704 }
705
706 public AuthBuilder reasonCode(byte reasonCode) {
707 this.reasonCode = reasonCode;
708 return this;
709 }
710
711 public MqttMessage build() {
712 MqttFixedHeader mqttFixedHeader =
713 new MqttFixedHeader(MqttMessageType.AUTH, false, MqttQoS.AT_MOST_ONCE, false, 0);
714 MqttReasonCodeAndPropertiesVariableHeader mqttAuthVariableHeader =
715 new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
716
717 return new MqttMessage(mqttFixedHeader, mqttAuthVariableHeader);
718 }
719 }
720
721 public static ConnectBuilder connect() {
722 return new ConnectBuilder();
723 }
724
725 public static ConnAckBuilder connAck() {
726 return new ConnAckBuilder();
727 }
728
729 public static PublishBuilder publish() {
730 return new PublishBuilder();
731 }
732
733 public static SubscribeBuilder subscribe() {
734 return new SubscribeBuilder();
735 }
736
737 public static UnsubscribeBuilder unsubscribe() {
738 return new UnsubscribeBuilder();
739 }
740
741 public static PubAckBuilder pubAck() {
742 return new PubAckBuilder();
743 }
744
745 public static SubAckBuilder subAck() {
746 return new SubAckBuilder();
747 }
748
749 public static UnsubAckBuilder unsubAck() {
750 return new UnsubAckBuilder();
751 }
752
753 public static DisconnectBuilder disconnect() {
754 return new DisconnectBuilder();
755 }
756
757 public static AuthBuilder auth() {
758 return new AuthBuilder();
759 }
760
761 private MqttMessageBuilders() {
762 }
763 }