JAVA使用ssl方式连接mqtt - Go语言中文社区

JAVA使用ssl方式连接mqtt


请确保你已经有证书了,且已经配置好了mqtt的ssl支持。没有配置好的请看之前的文章:自制CA证书,自制客户端,服务端证书mqtt配置使用ssl加密通信。按照这两篇文章操作之后接下来就可以编写java代码来使用ssl方式连接mqtt了。
作者是maven项目,所以添加一下依赖:

		<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.51</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpkix-jdk15on</artifactId>
            <version>1.47</version>
        </dependency>

整个项目目录结构如图:

在这里插入图片描述
核心类是SslUtil.java,里面配置了用于连接mqtt的ca根证书以及客户端证书和秘钥。下面列出上图中的各个文件的完整代码:
1.PublishCallback.java

package com.siplock.siplocklockservice.callbacks;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;

@Slf4j
public class PublishCallback implements MqttCallbackExtended {

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        log.info("[PublishCallback] 连接断开");
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("deliveryComplete---------" + token.isComplete());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
    }

    @Override
    public void connectComplete(boolean b, String s) {
        log.info("connected");
    }


}

2.SubcribeCallBack.java

package com.siplock.siplocklockservice.callbacks;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;

@Slf4j
public class SubcribeCallBack implements MqttCallbackExtended {

    @Override
    public void connectionLost(Throwable throwable) {
        log.warn("client lost connection,reconnecting");
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

        try {
            // subscribe后得到的消息会执行到这里面
            log.info("接收消息主题 : " + s);
            log.info("接收消息Qos : " + mqttMessage.getQos());
            log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
            String str = mqttMessage.toString();
            log.info("从MQTT收到的消息为:" + str + " ;TOPIC:" + s);
        } catch (Exception e) {
            log.error("SubcribeCallBack error:" + e.getMessage());
            e.printStackTrace();
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    @Override
    public void connectComplete(boolean b, String s) {
        log.info("receive connectted");
    }

}

3.MQTTPublishClient.java

package com.siplock.siplocklockservice.clients;

import com.siplock.siplocklockservice.callbacks.PublishCallback;
import com.siplock.siplocklockservice.util.SslUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

@Slf4j
public class MQTTPublishClient {
    //tcp://MQTT安装的服务器地址:MQTT定义的端口号
    public String HOST;
    //定义MQTT的ID,可以在MQTT服务配置中指定
    private String clientid ;

    private MqttClient client;
    @Setter
    @Getter
    private MqttTopic mqttTopic;

    /**
     * 构造函数
     * @throws MqttException
     */
    public MQTTPublishClient(String host,String serverId){
        log.info("MQTTPublishClient instance");
        HOST = host;
        clientid = serverId;
        // MemoryPersistence设置clientid的保存形式,默认为以内存保存
        try {
            if (client == null){
                client = new MqttClient(HOST, clientid, new MemoryPersistence());
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
        connect();
    }

    /**
     *  用来连接服务器
     */
    private void connect() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        // 设置超时时间
        options.setConnectionTimeout(20);
        // 设置会话心跳时间
        options.setKeepAliveInterval(10);
        options.setAutomaticReconnect(true);//设置自动重连
        try {
            options.setSocketFactory(SslUtil.getSocketFactory("/UserProfile/ca/ca.crt", "/UserProfile/ca/client.crt", "/UserProfile/ca/client.key", "123456"));
            client.setCallback(new PublishCallback());
            client.connect(options);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //发送消息并获取回执
    public void publish(String topic, MqttMessage message) throws MqttPersistenceException,
            MqttException, InterruptedException {
        log.info("publish   topic:  "+topic);
        mqttTopic = client.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(message);
        token.waitForCompletion();
        log.info("message is published completely! "
                + token.isComplete());
        log.info("messageId:" + token.getMessageId());
        token.getResponse();
        /*if (client.isConnected())
            client.disconnect(10000);*/
        log.info("Disconnected: delivery token "" + token.hashCode()
                + "" received: " + token.isComplete());
    }
}

4.MQTTSubcribeClient.java

package com.siplock.siplocklockservice.clients;

import com.siplock.siplocklockservice.callbacks.SubcribeCallBack;
import com.siplock.siplocklockservice.util.SslUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

@Slf4j
public class MQTTSubcribeClient {
    public String HOST ;
    private MqttClient client;
    private MqttConnectOptions options;

    public MQTTSubcribeClient(String host, String clientid) {
        try {
            this.HOST = host;
            log.info("MQTTSubcribeClient instanced");
            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的连接设置
            options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(false);
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(20);
            // 设置会话心跳时间 单位为秒 服务器会每隔5秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(10);
            options.setAutomaticReconnect(true);//设置自动重连

            // 设置回调
            try {
            	options.setSocketFactory(SslUtil.getSocketFactory("/UserProfile/ca/ca.crt", "/UserProfile/ca/client.crt", "/UserProfile/ca/client.key", "123456"));
            } catch (Exception e) {
                e.printStackTrace();
            }
            client.setCallback(new SubcribeCallBack());
            client.connect(options);
            //订阅消息
            String[] topic1 = {"/ssl"};
            client.subscribe(topic1);
        } catch (MqttException e) {
            e.printStackTrace();
        }

    }
}

5.SslUtil.java

package com.siplock.siplocklockservice.util;

import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMReader;
import org.bouncycastle.openssl.PasswordFinder;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.X509Certificate;

public class SslUtil {

    public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile,
                                                    final String password) throws Exception {
        Security.addProvider(new BouncyCastleProvider());

        // load CA certificate
        PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
        X509Certificate caCert = (X509Certificate)reader.readObject();
        reader.close();

        // load client certificate
        reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(crtFile)))));
        X509Certificate cert = (X509Certificate)reader.readObject();
        reader.close();

        // load client private key
        reader = new PEMReader(
                new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
                new PasswordFinder() {
                    @Override
                    public char[] getPassword() {
                        return password.toCharArray();
                    }
                }
        );
        KeyPair key = (KeyPair)reader.readObject();
        reader.close();

        // CA certificate is used to authenticate server
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("ca-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(caKs);

        // client key and certificates are sent to server so it can authenticate us
        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        ks.setCertificateEntry("certificate", cert);
        ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), new java.security.cert.Certificate[]{cert});
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());

        // finally, create SSL socket factory
        SSLContext context = SSLContext.getInstance("TLSv1");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

        return context.getSocketFactory();
    }
}

6.TestSSLPublish.java

package com.siplock.siplocklockservice;

import com.alibaba.fastjson.JSON;
import com.siplock.siplocklockservice.clients.MQTTPublishClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.HashMap;
import java.util.Map;

public class TestSSLPublish {

    public static void main(String[] args) throws Exception {
        //mqtt发送端
        MQTTPublishClient mqttClientSend = new MQTTPublishClient("ssl://127.0.0.1:31883", String.valueOf(Math.random()));
        MqttMessage message = new MqttMessage();
        Map msg = new HashMap<String,String>();
        msg.put("tag", "hello  ssl");
        message.setPayload(JSON.toJSONString(msg).getBytes("UTF-8"));
        message.setQos(0);
        message.setRetained(false);
        System.out.println("MQTT发送消息");
        mqttClientSend.publish("/ssl",message);
    }
}

7.TestSSLSubcribe.java

package com.siplock.siplocklockservice;

import com.siplock.siplocklockservice.clients.MQTTSubcribeClient;

public class TestSSLSubcribe {

    public static void main(String[] args) {
        MQTTSubcribeClient mqttReceiver = new MQTTSubcribeClient("ssl://127.0.0.1:31883", String.valueOf(Math.random()));
    }
}

接下来我们就直接测试:
首先,启动MQTTSubcribeClient来订阅主题:
在这里插入图片描述
再启动MQTTPublishClient向主题发送一条消息:
在这里插入图片描述
如图,消息已经发送成功,我们再来看接收端收到的消息:
在这里插入图片描述
数据都是一致的。以上就是所有代码,如有疑问,欢迎留言

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_22239675/article/details/86543066
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-05-07 23:09:43
  • 阅读 ( 1733 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢