Spring Boot连接MQTT服务器的Demo - Go语言中文社区

Spring Boot连接MQTT服务器的Demo


一、项目初始化

IDE:idea , MQTT broker: mosquitto

  1. 创建一个空的Maven项目:MQTT
    pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.starnet</groupId>
    <artifactId>mqtt</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <modules>
        <module>server</module>
        <module>device</module>
    </modules>
</project>
  1. 在MQTT中创建服务端module:server(Spring Boot项目):
    pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.starnet</groupId>
    <artifactId>server</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>server</name>
    <packaging>jar</packaging>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--MQTT-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <!--工具包-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
  1. 在MQTT中创建设备端module:device(Spring Boot项目),这里和服务端几乎一样,略过。

二、服务端实现

  1. 项目结构:
    在这里插入图片描述
  2. yml配置文件:
server:
  servlet:
    context-path: /server
  port: 8090
spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password: 123456
# 自定义的连接mqtt broker时的属性配置
mqtt-properties:
  host: tcp://127.0.0.1:1883
  clientId: Server
  username: mosquitto
  password:
  cleanSession: false
  connectionTimeout: 300
  keepAliveInterval: 60
  # 此属性设置为true的话,连接断开会自动重连
  automaticReconnect: true

读取属性配置的config:

package com.starnet.server.common;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * MqttProperties
 * MQTT客户端连接服务器所需设置的一些属性
 * @author wzzfarewell
 * @date 2019/12/4
 **/
@Component
@ConfigurationProperties(prefix = "mqtt-properties")
@Data
public class MqttProperties {
    private String host;
    private String clientId;
    private String username;
    private String password;
    private Boolean cleanSession;
    private Integer connectionTimeout;
    private Integer keepAliveInterval;
    private Boolean automaticReconnect;

}
  1. 连接MQTT所需的类:
    3.1. MqttPushClient:其中包含了一个mqtt客户端MqttClient (org.eclipse.paho.client.mqttv3中的类)和MqttServiceImpl、MqttProperties(通过构造方法传入)
package com.starnet.server.mqtt;

import com.starnet.server.common.Const;
import com.starnet.server.common.MqttProperties;
import com.starnet.server.service.impl.MqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * MqttPushClient
 * 创建一个MQTT服务器客户端,因为服务器只有一个,单例模式创建实例
 * @author wzzfarewell
 * @date 2019/12/4
 **/
@Slf4j
public class MqttPushClient {
    private MqttClient client;
    private MqttServiceImpl mqttService;
    private MqttProperties mqttProperties;
    private static volatile MqttPushClient instance = null;
    /**
     * 创建一个双重锁的客户端单例
     * @param mqttService 业务处理对象
     * @return MQTT客户端实例
     */
    public static MqttPushClient getInstance(MqttServiceImpl mqttService){
        if(instance == null){
            synchronized (MqttPushClient.class){
                if(instance == null){
                    instance = new MqttPushClient(mqttService);
                }
            }
        }
        return instance;
    }

    private MqttPushClient(MqttServiceImpl mqttService) {
        log.info("MQTT客户端[{}]连接", mqttService.getMqttProperties().getClientId());
        this.mqttService = mqttService;
        this.mqttProperties = mqttService.getMqttProperties();
        connect();
    }

    /**
     * 客户端连接到MQTT服务器
     */
    private void connect(){
        try {
            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(),
                    new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(mqttProperties.getCleanSession());
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
            options.setAutomaticReconnect(mqttProperties.getAutomaticReconnect());
            options.setWill(Const.DEVICE_WILL_TOPIC, "ServerOffline".getBytes(), 2, true);
            // 将业务处理对象赋给回调类
            client.setCallback(new PushCallback(mqttService));
            client.connect(options);
        } catch (MqttException e) {
            log.error("mqtt客户端[{}]连接异常:{}", mqttProperties.getClientId(), e.toString());
        }
    }

    public void publish(String topic, String data) {
        publish(topic, data, 1, false);
    }

    public void publish(String topic, String data, Integer qos, Boolean retained){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(data.getBytes());
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        MqttTopic mqttTopic = client.getTopic(topic);
        try {
            MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
            log.info("[{}]发布了主题:[{}],消息:[{}]", mqttProperties.getClientId(), topic, data);
        } catch (Exception e) {
            log.error("发布消息异常:{}", e.toString());
        }
    }

    public void subscribe(String topic){
        subscribe(topic, 1);
    }

    public void subscribe(String topic, int qos){
        try {
            client.subscribe(topic, qos);
            log.info("{}订阅了主题:[{}], QOS[{}]", mqttProperties.getClientId(), topic, qos);
        } catch (MqttException e) {
            log.error("客户端订阅异常:{}", e.toString());
        }
    }

    public void disconnect(){
        try {
            client.disconnect();
            log.info("[{}]主动断开连接", mqttProperties.getClientId());
        } catch (MqttException e) {
            log.error("主动断开连接异常:{}", e.toString());
        }
    }
}

3.2. PushCallback:MQTT客户端MqttClient的回调,在里面可以处理接收到的消息

package com.starnet.server.mqtt;

import com.alibaba.fastjson.JSONObject;
import com.starnet.server.service.impl.MqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.ArrayList;
import java.util.List;

/**
 * <h3>PushCallback
 * 消息的回调类</h3>
 * <p>
 * 必须实现MqttCallback的接口并实现对应的相关接口方法。<br/>
 * MqttCallbackExtended接口继承了MqttCallBack接口,增加了一个方法。<br/>
 * 每个客户机标识都需要一个回调实例。
 *
 * @author wzzfarewell
 * @date 2019/11/18
 **/
@Slf4j
public class PushCallback implements MqttCallbackExtended {
    private MqttServiceImpl mqttService;

    public PushCallback(MqttServiceImpl mqttService) {
        this.mqttService = mqttService;
    }

    /**
     * 在这里处理接收到的消息。
     *
     * @param s           消息主题
     * @param mqttMessage 消息对象
     * @throws Exception
     */
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        // 订阅之后的消息执行到这里
        String payload = new String(mqttMessage.getPayload());
        log.info("主题:[{}],内容: [{}]", s, payload);
    }

    /**
     * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
     *
     * @param iMqttDeliveryToken 消息的传递令牌
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("发送完成?--->[{}]", iMqttDeliveryToken.isComplete());
    }

    /**
     * 此方法在客户端连接断开之后调用
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("连接断开,可以重连...");
    }

    /**
     * 此方法在客户端连接成功之后调用
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        // 重连后重新订阅之前订阅的topic
        String key = mqttService.getMqttProperties().getClientId() + "_Topics";
        RedisTemplate<String, String> redisTemplate 
                            
                            版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_38263130/article/details/103785803
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢