社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
消费者目录结构
MyDecoder类
package com.kd.food.kafka.consumer;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import com.alibaba.fastjson.JSONObject;
import kafka.serializer.Decoder;
public class MyDecoder implements Decoder<Object> {
public MyDecoder() {
this("UTF8");
}
public MyDecoder(final String encoding) {
final Properties props = new Properties();
props.put("serializer.encoding", encoding);
}
@SuppressWarnings("finally")
@Override
public Object fromBytes(byte[] bytes) {
String content = "";
JSONObject object = null;
boolean isbean = true;
try {
content = new String(bytes, "UTF-8");
try {
object = JSONObject.parseObject(content);
} catch (Exception e) {
isbean = false;
} finally {
if (isbean) {
return object;
} else {
return content;
}
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return object;
}
}
ConsumerMessages类
package com.kd.food.kafka.consumer;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonArray;
import com.kd.food.model.UserEquipment;
import com.kd.food.service.UserEquipmentService;
/**
* 接收消息类
* @author 25388
*
*/
public class ConsumerMessages {
@Autowired
private UserEquipmentService userEquipmentService;
/**
* 接受消息方法
* @param msgs
*/
public void processMessage(Map<String, Map<Integer, Object>> msgs) {
try {
for (Map.Entry<String, Map<Integer, Object>> entry : msgs.entrySet()) {
Map<Integer, Object> value = entry.getValue();
for (Map.Entry<Integer, Object> entrys : value.entrySet()){
JSONArray keyArr = (JSONArray) JSONArray.parse(entrys.getValue().toString());
//JSONArray keyArr =(JSONArray)entrys.getValue();
JSONObject keyjson = (JSONObject) keyArr.get(0);
String type = keyjson.getString("type");
String jsons = keyjson.getString("jsons");
JSONObject json = JSONObject.parseObject(jsons);
if("1".equals(type)){//修改设备状态
String uid = json.getString("uid");
String status = json.getString("status");
//封装对象
UserEquipment userEquipment = new UserEquipment();
userEquipment.setStatus(status);
userEquipment.setTerminalId(uid);
//调用修改设备状态
userEquipmentService.updateUserEquipmentByMac(userEquipment);
}
if("2".equals(type)){//设备解绑所有用户
String terminalId = json.getString("uid");
//调用解绑所有用户
userEquipmentService.deleteUserEquipmentByMac(terminalId);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd">
<!-- 接收的频道 也可以理解为接收的工具类 -->
<int:channel id="inputFromKafka">
<int:dispatcher task-executor="kafkaMessageExecutor" />
</int:channel>
<!-- zookeeper配置 可以配置多个 -->
<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="${kafka.zookeeper}" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" />
<!-- channel配置 auto-startup="true" 否则接收不发数据 -->
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="inputFromKafka">
<int:poller fixed-delay="1" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>
<task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500" />
<!-- <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" /> -->
<bean id="kafkaDecoder" class="com.kd.food.kafka.consumer.MyDecoder" />
<bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="auto.offset.reset">smallest</prop>
<prop key="socket.receive.buffer.bytes">10485760</prop>
<prop key="fetch.message.max.bytes">5242880</prop>
<prop key="auto.commit.interval.ms">1000</prop>
</props>
</property>
</bean>
<!-- 消息接收的BEEN -->
<bean id="kafkaConsumerService" class="com.kd.food.kafka.consumer.ConsumerMessages" />
<!-- 指定接收的方法 -->
<int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumerService" method="processMessage" />
<int-kafka:consumer-context id="consumerContext" consumer-timeout="1000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="default1" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder" max-messages="5000">
<!-- 两个TOPIC配置 -->
<int-kafka:topic id="${kafka.topic1}" streams="4" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
</beans>
spring.xml 添加上 kafka-constomer.xml
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!