Kafka环境搭建以及服务封装 - Go语言中文社区

Kafka环境搭建以及服务封装


一、安装前准备

1、kafka版本:kafka_2.10-0.10.1.0.tgz
2、zookeeper版本:zookeeper-3.4.3.tar.gz
3、zookeeper集群:
192.168.1.108:2181,192.168.1.109:2181,192.168.1.110:2181

4、hosts文件中主机与ip映射关系

192.168.1.108 master
192.168.1.109 slave1
192.168.1.110 slave2

二、zookeeper环境搭建

具体的zookeeper环境搭建请参考:

http://blog.csdn.net/liuchuanhong1/article/details/53192618

三、Kafka环境搭建

1、下载tar

下载地址如下:

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz

2、解压到/usr/kafka目录下

解压命令如下:

tar -zxvf kafka_2.10-0.10.1.0.tgz

3、解压后目录结构

 

四、修改配置文件

1、修改server.properties

修改配置文件如下:

broker.id=0
host.name=192.168.1.108
zookeeper.connect=slave1:2181,master:2181,slave2:2181

五、启动kafka服务

1、启动zookeeper集群

2、进入/bin目录下,输入如下命令,启动kafka服务

 ./kafka-server-start.sh ../config/server.properties

启动之后,可以看到如下的配置信息:

[root@192 bin]# ./kafka-server-start.sh ../config/server.properties 

[2016-11-18 10:50:51,067] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        authorizer.class.name = 
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 0
        broker.id.generation.enable = true
        broker.rack = null
        compression.type = producer
        connections.max.idle.ms = 600000
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        default.replication.factor = 1
        delete.topic.enable = false
        fetch.purgatory.purge.interval.requests = 1000
        group.max.session.timeout.ms = 300000
        group.min.session.timeout.ms = 6000
        host.name = 
        inter.broker.protocol.version = 0.10.1-IV2
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listeners = null
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /home/kafka/kafka-logs
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.format.version = 0.10.1-IV2
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides = 
        message.max.bytes = 1000012
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 1440
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 3
        offsets.topic.segment.bytes = 104857600
        port = 9092
        principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
        producer.purgatory.purge.interval.requests = 1000
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism.inter.broker.protocol = GSSAPI
        security.inter.broker.protocol = PLAINTEXT
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = null
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        unclean.leader.election.enable = true
        zookeeper.connect = slave1:2181,master:2181,slave2:2181
        zookeeper.connection.timeout.ms = 6000
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2016-11-18 10:50:51,175] INFO starting (kafka.server.KafkaServer)
[2016-11-18 10:50:51,244] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-11-18 10:50:51,261] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-11-18 10:50:51,269] INFO Connecting to zookeeper on slave1:2181,master:2181,slave2:2181 (kafka.server.KafkaServer)
[2016-11-18 10:50:51,308] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2016-11-18 10:50:51,328] INFO Client environment:zookeeper.version=3.4.8--1, built on 02/06/2016 03:18 GMT (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:host.name=master (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:java.version=1.8.0_111 (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2016-11-18 10:50:51,328] INFO Client environment:java.home=/usr/Java/jdk1.8.0_111/jre (org.apache.zookeeper.ZooKeeper)

3、查看kafka进程

 

4、创建topic

./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafka-test

5、查看详细的topic


六、Kafka生产者服务封装

1、创建Kafka

public class Kafka {

	private static Logger LOG = Logger.getLogger(Kafka.class);

	/**
	 * 生产者的broker列表
	 */
	private String producerBrokerList;

	/**
	 * 序列化类
	 */
	private String producerSerializerClass;

	/**
	 * topic
	 */
	private String producerTopic;

	/**
	 * 重试次数
	 */
	private int retry;

	/**
	 * kafka生产者
	 */
	private Producer<String, String> kafkaProducer;
	public Kafka(String producerBrokerList, String producerSerializerClass,
			String producerTopic, int retry) {

		super();
		this.producerBrokerList = producerBrokerList;
		this.producerSerializerClass = producerSerializerClass;
		this.producerTopic = producerTopic;
		this.retry = retry;
		init();
	}

	/**
	 * Details:初始化方法
	 */
	public void init() {
		kafkaProducer = initProducer(producerBrokerList,
				producerSerializerClass);
	}

	/**
	 * Details:创建Produce
	 */
	private Producer<String, String> initProducer(String producerBrokerList,
			String producerSerializerClass) {
		Producer<String, String> kafkaProducer;
		Properties props = new Properties();
		props.put("metadata.broker.list", producerBrokerList);
		props.put("serializer.class", producerSerializerClass);
		props.put("key.serializer.class", producerSerializerClass);
		props.put("request.required.acks", "-1");
		props.put("producerType", "async");
		ProducerConfig producerConfig = new ProducerConfig(props);
		kafkaProducer = new Producer<String, String>(producerConfig);
		return kafkaProducer;
	}

	/**
	 * Details:供外部调用的接口,用来发送消息到kafka
	 */
	public void sendData(String content) {
		KeyedMessage<String, String> kafkaMessage = new KeyedMessage<String, String>(
				producerTopic, content);
		try {
			kafkaProducer.send(kafkaMessage);
		} catch (Exception e) {
			LOG.warn("send kafka message failed , prepare to retry......", e);
			// 重试发送消息
			retryKafkaSendData(content);
		}
	}

	/**
	 * Details:发送消息失败后的重试机制
	 */
	public void retryKafkaSendData(String content) {
		KeyedMessage<String, String> kafkaMessage = new KeyedMessage<String, String>(
				producerTopic, content);
		for (int i = 1; i <= (retry <= 0 ? 3 : retry); i++) {
			try {
				kafkaProducer.send(kafkaMessage);
				return;
			} catch (Exception e) {
				LOG.warn("send kafka message failed , retry times:" + i, e);
			}
		}
	}

	/**
	 * Details:销毁生产者的方法
	 */
	public void close() {
		kafkaProducer.close();
	}
}

2、创建KafkaFactoryBean

public class KafkaFactoryBean implements FactoryBean<Kafka>, InitializingBean,
		DisposableBean {
	/**
	 * FactoryBean生成的目标对象
	 */
	private Kafka kafka;

	/**
	 * broker列表
	 */
	private String producerBrokerList;

	/**
	 * 序列化类
	 */
	private String producerSerializerClass;

	/**
	 * kafka的topic
	 */
	private String producerTopic;

	/**
	 * 重试次数
	 */
	private int retry;

	/**
	 * Details:实例销毁方法,当实例销毁时,会自动调用这个方法
	 */
	@Override
	public void destroy() throws Exception {
		if (null != kafka) {
			kafka.close();
		}
	}

	/**
	 * Details:spring加载后,会调用该方法
	 */
	@Override
	public void afterPropertiesSet() throws Exception {
		kafka = new Kafka(producerBrokerList, producerSerializerClass,
				producerTopic, retry);
	}

	/**
	 * Details:返回工厂创建的对象,注意,此处返回的并不是FactoryBean的一个实例,而是返回Kafka的一个实例
	 */
	@Override
	public Kafka getObject() throws Exception {
		return this.kafka;
	}

	/**
	 * Details:获取返回对象的类型
	 */
	@Override
	public Class<?> getObjectType() {
		return (this.kafka == null ? Kafka.class : this.kafka.getClass());
	}

	/**
	 * Details:创建的对象是否为单例
	 */
	@Override
	public boolean isSingleton() {
		return true;
	}
}

如果对FactoryBean的使用有不明白的地方,请参考另一篇文章:

http://blog.csdn.net/liuchuanhong1/article/details/52939353

七、测试生产者服务

1、在服务器上开启消费者监听进程

./kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafka-test --from-beginnin

消费者进程如下:

 

2、测试的spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:util="http://www.springframework.org/schema/util"
	xsi:schemaLocation="http://www.springframework.org/schema/beans

		http://www.springframework.org/schema/beans/spring-beans.xsd 

		http://www.springframework.org/schema/context 

		http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"
	default-lazy-init="false">

	<context:property-placeholder location="classpath:kafka.properties" />

	<bean id="kafkaFactoryBean" class="com.chhliu.myself.KafkaFactoryBean">
		<property name="producerBrokerList"
			value="${bootstrap.servers}" />
		<property name="producerSerializerClass" value="kafka.serializer.StringEncoder" />
		<!--目标地址 -->
		<property name="producerTopic" value="${kafka.topic}" />
		<!-- 重试机制默认3次 -->
		<property name="retry" value="3" />
	</bean>
</beans>
3、测试的properties文件

bootstrap.servers=192.168.1.108:9092
kafka.topic=kafka-test

4、运行测试代码

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext-kafka.xml" })
public class KafkaProducerTest {
	// 此处返回的并不是KafkaFactoryBean的一个实例,而是Kafka的实例
	@Resource(name = "kafkaFactoryBean")
	private Kafka factory;
	@Test
	public void test() {
		try {
			for (int i = 0; i < 1000; i++) {
				factory.sendData("hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!"
						+ i);
			}
		} catch (Exception e) {
			System.out.println(e.getMessage());
		}
	}
}

测试效果如下:

hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!601
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!602
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!603
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!604
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!605
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!606
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!607
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!608
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!609
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!610
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!611
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!612
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!613
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!614
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!615
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!616
hello kafka, my name is chhliu111!!!!!!!!!!!!!!!!617

我们发现,生产者成功的将消息发送到了kafka,消费者也成功的消费了消息。

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/liuchuanhong1/article/details/53215557
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-06 23:03:57
  • 阅读 ( 806 )
  • 分类:Go环境配置

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢