社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
IDE:idea , MQTT broker: mosquitto
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.starnet</groupId>
<artifactId>mqtt</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>server</module>
<module>device</module>
</modules>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.starnet</groupId>
<artifactId>server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>server</name>
<packaging>jar</packaging>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
<!--MQTT-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--工具包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
server:
servlet:
context-path: /server
port: 8090
spring:
redis:
host: 127.0.0.1
port: 6379
password: 123456
# 自定义的连接mqtt broker时的属性配置
mqtt-properties:
host: tcp://127.0.0.1:1883
clientId: Server
username: mosquitto
password:
cleanSession: false
connectionTimeout: 300
keepAliveInterval: 60
# 此属性设置为true的话,连接断开会自动重连
automaticReconnect: true
读取属性配置的config:
package com.starnet.server.common;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* MqttProperties
* MQTT客户端连接服务器所需设置的一些属性
* @author wzzfarewell
* @date 2019/12/4
**/
@Component
@ConfigurationProperties(prefix = "mqtt-properties")
@Data
public class MqttProperties {
private String host;
private String clientId;
private String username;
private String password;
private Boolean cleanSession;
private Integer connectionTimeout;
private Integer keepAliveInterval;
private Boolean automaticReconnect;
}
package com.starnet.server.mqtt;
import com.starnet.server.common.Const;
import com.starnet.server.common.MqttProperties;
import com.starnet.server.service.impl.MqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* MqttPushClient
* 创建一个MQTT服务器客户端,因为服务器只有一个,单例模式创建实例
* @author wzzfarewell
* @date 2019/12/4
**/
@Slf4j
public class MqttPushClient {
private MqttClient client;
private MqttServiceImpl mqttService;
private MqttProperties mqttProperties;
private static volatile MqttPushClient instance = null;
/**
* 创建一个双重锁的客户端单例
* @param mqttService 业务处理对象
* @return MQTT客户端实例
*/
public static MqttPushClient getInstance(MqttServiceImpl mqttService){
if(instance == null){
synchronized (MqttPushClient.class){
if(instance == null){
instance = new MqttPushClient(mqttService);
}
}
}
return instance;
}
private MqttPushClient(MqttServiceImpl mqttService) {
log.info("MQTT客户端[{}]连接", mqttService.getMqttProperties().getClientId());
this.mqttService = mqttService;
this.mqttProperties = mqttService.getMqttProperties();
connect();
}
/**
* 客户端连接到MQTT服务器
*/
private void connect(){
try {
client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(),
new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(mqttProperties.getCleanSession());
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
options.setAutomaticReconnect(mqttProperties.getAutomaticReconnect());
options.setWill(Const.DEVICE_WILL_TOPIC, "ServerOffline".getBytes(), 2, true);
// 将业务处理对象赋给回调类
client.setCallback(new PushCallback(mqttService));
client.connect(options);
} catch (MqttException e) {
log.error("mqtt客户端[{}]连接异常:{}", mqttProperties.getClientId(), e.toString());
}
}
public void publish(String topic, String data) {
publish(topic, data, 1, false);
}
public void publish(String topic, String data, Integer qos, Boolean retained){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(data.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
MqttTopic mqttTopic = client.getTopic(topic);
try {
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
log.info("[{}]发布了主题:[{}],消息:[{}]", mqttProperties.getClientId(), topic, data);
} catch (Exception e) {
log.error("发布消息异常:{}", e.toString());
}
}
public void subscribe(String topic){
subscribe(topic, 1);
}
public void subscribe(String topic, int qos){
try {
client.subscribe(topic, qos);
log.info("{}订阅了主题:[{}], QOS[{}]", mqttProperties.getClientId(), topic, qos);
} catch (MqttException e) {
log.error("客户端订阅异常:{}", e.toString());
}
}
public void disconnect(){
try {
client.disconnect();
log.info("[{}]主动断开连接", mqttProperties.getClientId());
} catch (MqttException e) {
log.error("主动断开连接异常:{}", e.toString());
}
}
}
3.2. PushCallback:MQTT客户端MqttClient的回调,在里面可以处理接收到的消息
package com.starnet.server.mqtt;
import com.alibaba.fastjson.JSONObject;
import com.starnet.server.service.impl.MqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.ArrayList;
import java.util.List;
/**
* <h3>PushCallback
* 消息的回调类</h3>
* <p>
* 必须实现MqttCallback的接口并实现对应的相关接口方法。<br/>
* MqttCallbackExtended接口继承了MqttCallBack接口,增加了一个方法。<br/>
* 每个客户机标识都需要一个回调实例。
*
* @author wzzfarewell
* @date 2019/11/18
**/
@Slf4j
public class PushCallback implements MqttCallbackExtended {
private MqttServiceImpl mqttService;
public PushCallback(MqttServiceImpl mqttService) {
this.mqttService = mqttService;
}
/**
* 在这里处理接收到的消息。
*
* @param s 消息主题
* @param mqttMessage 消息对象
* @throws Exception
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// 订阅之后的消息执行到这里
String payload = new String(mqttMessage.getPayload());
log.info("主题:[{}],内容: [{}]", s, payload);
}
/**
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
*
* @param iMqttDeliveryToken 消息的传递令牌
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("发送完成?--->[{}]", iMqttDeliveryToken.isComplete());
}
/**
* 此方法在客户端连接断开之后调用
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("连接断开,可以重连...");
}
/**
* 此方法在客户端连接成功之后调用
*
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
// 重连后重新订阅之前订阅的topic
String key = mqttService.getMqttProperties().getClientId() + "_Topics";
RedisTemplate<String, String> redisTemplate
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_38263130/article/details/103785803
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!