java 集成spring+zookeeper+kafka---消费者 - Go语言中文社区

java 集成spring+zookeeper+kafka---消费者


消费者目录结构






MyDecoder类

package com.kd.food.kafka.consumer;

import java.io.UnsupportedEncodingException;
import java.util.Properties;

import com.alibaba.fastjson.JSONObject;

import kafka.serializer.Decoder;

public class MyDecoder implements Decoder<Object> {

	public MyDecoder() {
		this("UTF8");
	}

	public MyDecoder(final String encoding) {
		final Properties props = new Properties();
		props.put("serializer.encoding", encoding);
	}

	@SuppressWarnings("finally")
	@Override
	public Object fromBytes(byte[] bytes) {
		String content = "";
		JSONObject object = null;
		boolean isbean = true;
		try {
			content = new String(bytes, "UTF-8");
			try {
				object = JSONObject.parseObject(content);
			} catch (Exception e) {
				isbean = false;
			} finally {
				if (isbean) {
					return object;
				} else {
					return content;
				}
			}
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		return object;
	}

}


ConsumerMessages类

package com.kd.food.kafka.consumer;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonArray;
import com.kd.food.model.UserEquipment;
import com.kd.food.service.UserEquipmentService;

/**
 * 接收消息类
 * @author 25388
 *
 */
public class ConsumerMessages {
	
	@Autowired
	private UserEquipmentService userEquipmentService;
	
	/**
	 * 接受消息方法
	 * @param msgs
	 */
	public void processMessage(Map<String, Map<Integer, Object>> msgs) {
		try {

			for (Map.Entry<String, Map<Integer, Object>> entry : msgs.entrySet()) {
				Map<Integer, Object> value = entry.getValue();
				for (Map.Entry<Integer, Object> entrys : value.entrySet()){
					JSONArray keyArr = (JSONArray) JSONArray.parse(entrys.getValue().toString());
					//JSONArray keyArr =(JSONArray)entrys.getValue();
					JSONObject keyjson = (JSONObject) keyArr.get(0);
					String type = keyjson.getString("type");
					String jsons = keyjson.getString("jsons");
					JSONObject json = JSONObject.parseObject(jsons);
					if("1".equals(type)){//修改设备状态
						String uid = json.getString("uid");
						String status = json.getString("status");
						//封装对象
						UserEquipment userEquipment = new UserEquipment();
						userEquipment.setStatus(status);
						userEquipment.setTerminalId(uid);
						//调用修改设备状态
						userEquipmentService.updateUserEquipmentByMac(userEquipment);
					}
					if("2".equals(type)){//设备解绑所有用户
						String terminalId = json.getString("uid");
						//调用解绑所有用户
						userEquipmentService.deleteUserEquipmentByMac(terminalId);
					}
					
				}
			}
		
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

kafka-consumer.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka 
                        http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
                        http://www.springframework.org/schema/integration 
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/task 
                        http://www.springframework.org/schema/task/spring-task.xsd">

	<!-- 接收的频道 也可以理解为接收的工具类 -->
	<int:channel id="inputFromKafka">
		<int:dispatcher task-executor="kafkaMessageExecutor" />
	</int:channel>
	<!-- zookeeper配置 可以配置多个 -->
	<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="${kafka.zookeeper}" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" />
	
	<!-- channel配置 auto-startup="true" 否则接收不发数据 -->
	<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="inputFromKafka">
		<int:poller fixed-delay="1" time-unit="MILLISECONDS" />
	</int-kafka:inbound-channel-adapter>
	<task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500" />
	<!-- <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" /> -->
	
	<bean id="kafkaDecoder" class="com.kd.food.kafka.consumer.MyDecoder" />
	<bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="properties">
			<props>
				<prop key="auto.offset.reset">smallest</prop>
				<prop key="socket.receive.buffer.bytes">10485760</prop>
				<prop key="fetch.message.max.bytes">5242880</prop>
				<prop key="auto.commit.interval.ms">1000</prop>
			</props>
		</property>
	</bean>
	<!-- 消息接收的BEEN -->
	<bean id="kafkaConsumerService" class="com.kd.food.kafka.consumer.ConsumerMessages" />
	<!-- 指定接收的方法 -->
	<int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumerService" method="processMessage" />

	<int-kafka:consumer-context id="consumerContext" consumer-timeout="1000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
		<int-kafka:consumer-configurations>
			<int-kafka:consumer-configuration group-id="default1" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder" max-messages="5000">
				<!-- 两个TOPIC配置 -->
				<int-kafka:topic id="${kafka.topic1}" streams="4" />
			</int-kafka:consumer-configuration>
		</int-kafka:consumer-configurations>
	</int-kafka:consumer-context>
</beans>


spring.xml 添加上 kafka-constomer.xml




kafka-consumer.xml
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_37555579/article/details/78944486
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-08 10:51:40
  • 阅读 ( 1360 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢