1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.example.mqtt.heartBeat;
17
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.channel.ChannelInboundHandlerAdapter;
20 import io.netty.handler.codec.mqtt.MqttConnectMessage;
21 import io.netty.handler.codec.mqtt.MqttConnectPayload;
22 import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
23 import io.netty.handler.codec.mqtt.MqttFixedHeader;
24 import io.netty.handler.codec.mqtt.MqttMessage;
25 import io.netty.handler.codec.mqtt.MqttMessageType;
26 import io.netty.handler.codec.mqtt.MqttQoS;
27 import io.netty.handler.codec.mqtt.MqttProperties;
28 import io.netty.handler.timeout.IdleStateEvent;
29 import io.netty.util.ReferenceCountUtil;
30
31 public class MqttHeartBeatClientHandler extends ChannelInboundHandlerAdapter {
32
33 private static final String PROTOCOL_NAME_MQTT_3_1_1 = "MQTT";
34 private static final int PROTOCOL_VERSION_MQTT_3_1_1 = 4;
35
36 private final String clientId;
37 private final String userName;
38 private final byte[] password;
39
40 public MqttHeartBeatClientHandler(String clientId, String userName, String password) {
41 this.clientId = clientId;
42 this.userName = userName;
43 this.password = password.getBytes();
44 }
45
46 @Override
47 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
48
49 ReferenceCountUtil.release(msg);
50 }
51
52 @Override
53 public void channelActive(ChannelHandlerContext ctx) throws Exception {
54 MqttFixedHeader connectFixedHeader =
55 new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
56 MqttConnectVariableHeader connectVariableHeader =
57 new MqttConnectVariableHeader(PROTOCOL_NAME_MQTT_3_1_1, PROTOCOL_VERSION_MQTT_3_1_1, true, true, false,
58 0, false, false, 20, MqttProperties.NO_PROPERTIES);
59 MqttConnectPayload connectPayload = new MqttConnectPayload(clientId,
60 MqttProperties.NO_PROPERTIES,
61 null,
62 null,
63 userName,
64 password);
65 MqttConnectMessage connectMessage =
66 new MqttConnectMessage(connectFixedHeader, connectVariableHeader, connectPayload);
67 ctx.writeAndFlush(connectMessage);
68 System.out.println("Sent CONNECT");
69 }
70
71 @Override
72 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
73 if (evt instanceof IdleStateEvent) {
74 MqttFixedHeader pingreqFixedHeader =
75 new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
76 MqttMessage pingreqMessage = new MqttMessage(pingreqFixedHeader);
77 ctx.writeAndFlush(pingreqMessage);
78 System.out.println("Sent PINGREQ");
79 } else {
80 super.userEventTriggered(ctx, evt);
81 }
82 }
83
84 @Override
85 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
86 cause.printStackTrace();
87 ctx.close();
88 }
89 }