社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
请确保你已经有证书了,且已经配置好了mqtt的ssl支持。没有配置好的请看之前的文章:自制CA证书,自制客户端,服务端证书、mqtt配置使用ssl加密通信。按照这两篇文章操作之后接下来就可以编写java代码来使用ssl方式连接mqtt了。
作者是maven项目,所以添加一下依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.47</version>
</dependency>
整个项目目录结构如图:
核心类是SslUtil.java,里面配置了用于连接mqtt的ca根证书以及客户端证书和秘钥。下面列出上图中的各个文件的完整代码:
1.PublishCallback.java
package com.siplock.siplocklockservice.callbacks;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@Slf4j
public class PublishCallback implements MqttCallbackExtended {
@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
log.info("[PublishCallback] 连接断开");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("deliveryComplete---------" + token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
@Override
public void connectComplete(boolean b, String s) {
log.info("connected");
}
}
2.SubcribeCallBack.java
package com.siplock.siplocklockservice.callbacks;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@Slf4j
public class SubcribeCallBack implements MqttCallbackExtended {
@Override
public void connectionLost(Throwable throwable) {
log.warn("client lost connection,reconnecting");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
try {
// subscribe后得到的消息会执行到这里面
log.info("接收消息主题 : " + s);
log.info("接收消息Qos : " + mqttMessage.getQos());
log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
String str = mqttMessage.toString();
log.info("从MQTT收到的消息为:" + str + " ;TOPIC:" + s);
} catch (Exception e) {
log.error("SubcribeCallBack error:" + e.getMessage());
e.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
@Override
public void connectComplete(boolean b, String s) {
log.info("receive connectted");
}
}
3.MQTTPublishClient.java
package com.siplock.siplocklockservice.clients;
import com.siplock.siplocklockservice.callbacks.PublishCallback;
import com.siplock.siplocklockservice.util.SslUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@Slf4j
public class MQTTPublishClient {
//tcp://MQTT安装的服务器地址:MQTT定义的端口号
public String HOST;
//定义MQTT的ID,可以在MQTT服务配置中指定
private String clientid ;
private MqttClient client;
@Setter
@Getter
private MqttTopic mqttTopic;
/**
* 构造函数
* @throws MqttException
*/
public MQTTPublishClient(String host,String serverId){
log.info("MQTTPublishClient instance");
HOST = host;
clientid = serverId;
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
try {
if (client == null){
client = new MqttClient(HOST, clientid, new MemoryPersistence());
}
} catch (MqttException e) {
e.printStackTrace();
}
connect();
}
/**
* 用来连接服务器
*/
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
// 设置超时时间
options.setConnectionTimeout(20);
// 设置会话心跳时间
options.setKeepAliveInterval(10);
options.setAutomaticReconnect(true);//设置自动重连
try {
options.setSocketFactory(SslUtil.getSocketFactory("/UserProfile/ca/ca.crt", "/UserProfile/ca/client.crt", "/UserProfile/ca/client.key", "123456"));
client.setCallback(new PublishCallback());
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}
//发送消息并获取回执
public void publish(String topic, MqttMessage message) throws MqttPersistenceException,
MqttException, InterruptedException {
log.info("publish topic: "+topic);
mqttTopic = client.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(message);
token.waitForCompletion();
log.info("message is published completely! "
+ token.isComplete());
log.info("messageId:" + token.getMessageId());
token.getResponse();
/*if (client.isConnected())
client.disconnect(10000);*/
log.info("Disconnected: delivery token "" + token.hashCode()
+ "" received: " + token.isComplete());
}
}
4.MQTTSubcribeClient.java
package com.siplock.siplocklockservice.clients;
import com.siplock.siplocklockservice.callbacks.SubcribeCallBack;
import com.siplock.siplocklockservice.util.SslUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@Slf4j
public class MQTTSubcribeClient {
public String HOST ;
private MqttClient client;
private MqttConnectOptions options;
public MQTTSubcribeClient(String host, String clientid) {
try {
this.HOST = host;
log.info("MQTTSubcribeClient instanced");
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
// 设置超时时间 单位为秒
options.setConnectionTimeout(20);
// 设置会话心跳时间 单位为秒 服务器会每隔5秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(10);
options.setAutomaticReconnect(true);//设置自动重连
// 设置回调
try {
options.setSocketFactory(SslUtil.getSocketFactory("/UserProfile/ca/ca.crt", "/UserProfile/ca/client.crt", "/UserProfile/ca/client.key", "123456"));
} catch (Exception e) {
e.printStackTrace();
}
client.setCallback(new SubcribeCallBack());
client.connect(options);
//订阅消息
String[] topic1 = {"/ssl"};
client.subscribe(topic1);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
5.SslUtil.java
package com.siplock.siplocklockservice.util;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMReader;
import org.bouncycastle.openssl.PasswordFinder;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.X509Certificate;
public class SslUtil {
public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile,
final String password) throws Exception {
Security.addProvider(new BouncyCastleProvider());
// load CA certificate
PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
X509Certificate caCert = (X509Certificate)reader.readObject();
reader.close();
// load client certificate
reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(crtFile)))));
X509Certificate cert = (X509Certificate)reader.readObject();
reader.close();
// load client private key
reader = new PEMReader(
new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
new PasswordFinder() {
@Override
public char[] getPassword() {
return password.toCharArray();
}
}
);
KeyPair key = (KeyPair)reader.readObject();
reader.close();
// CA certificate is used to authenticate server
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(caKs);
// client key and certificates are sent to server so it can authenticate us
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("certificate", cert);
ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), new java.security.cert.Certificate[]{cert});
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
// finally, create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context.getSocketFactory();
}
}
6.TestSSLPublish.java
package com.siplock.siplocklockservice;
import com.alibaba.fastjson.JSON;
import com.siplock.siplocklockservice.clients.MQTTPublishClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.HashMap;
import java.util.Map;
public class TestSSLPublish {
public static void main(String[] args) throws Exception {
//mqtt发送端
MQTTPublishClient mqttClientSend = new MQTTPublishClient("ssl://127.0.0.1:31883", String.valueOf(Math.random()));
MqttMessage message = new MqttMessage();
Map msg = new HashMap<String,String>();
msg.put("tag", "hello ssl");
message.setPayload(JSON.toJSONString(msg).getBytes("UTF-8"));
message.setQos(0);
message.setRetained(false);
System.out.println("MQTT发送消息");
mqttClientSend.publish("/ssl",message);
}
}
7.TestSSLSubcribe.java
package com.siplock.siplocklockservice;
import com.siplock.siplocklockservice.clients.MQTTSubcribeClient;
public class TestSSLSubcribe {
public static void main(String[] args) {
MQTTSubcribeClient mqttReceiver = new MQTTSubcribeClient("ssl://127.0.0.1:31883", String.valueOf(Math.random()));
}
}
接下来我们就直接测试:
首先,启动MQTTSubcribeClient来订阅主题:
再启动MQTTPublishClient向主题发送一条消息:
如图,消息已经发送成功,我们再来看接收端收到的消息:
数据都是一致的。以上就是所有代码,如有疑问,欢迎留言
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!