kafka的基本原理和springBoot整合kafka - Go语言中文社区

kafka的基本原理和springBoot整合kafka


1.kafka的简单介绍

Apache kafka官网上对kafka的简单别成为 a distributed streaming platform  即一个分布式流平台

主要有三大功能 :

publish and subscribe  发布和订阅   Read and write streams of data like a messaging system. 像消息系统一样读写数据流

process(kafka stream )   处理数据  The easiest way to write mission-critical real-time applications and microservices

Storage   存储系统   

简单的说kafka可以作为messaging, storage, and stream processing 进行使用,本文主要介绍messaging

2.kafka的消息系统

kafka相关术语:

broker kafka集群中的一个服务器

topic 每个消息发布到kafka上都有一个主题,一个主题可以保存在一个broker或多个broker上,但对于用户来说不用关心

partition 分区 是物理上的概念,每个topic包含一个或多个分区,且每个分区内消息有序,符合FIFO,设置分区是为了扩展某个topic,由于某个单个服务器可能不能应对一个巨大数量消息的topic,设置partition可以扩展,一个partition就是物理机上的一个文件夹,segmemt就是该文件夹中消息文件。

producer 消息的发布者

consumer 消息的消费者

offset 偏移量   The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition  在分区中唯一标识记录的id号,而且这个offset是由消费者控制的,默认的情况下是线性的将offset进行偏移,但可以由使用者控制,例如,可以设置到过去的偏移量,也可以直接跳到现在的消费位置。

 consumer group   消费组的概念,消费组和topic是一一对应的,例如,tipic里的一个消息只能被一个消费组消费一次,如果再一个消费组里有多个消费者,则采用load balanced的方式消费,如果所有的消费者都不属于同一个消费者,则订阅该topic的消息所有的消费者都会接收到消息,消费组的概念是为了补充多个消费者竞争分割的处理消息

 

kafka client 和server 是如何通讯的

In Kafka the communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol 

kafka的客户端和服务器通讯使用的tcp协议

3.在linux上搭建kafka集群

 环境   vmware workstation 虚拟机linux 操作系统 Linux centos7 3.10.0-327.el7.x86_64 #1 SMP Thu Nov 19 22:10:57 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

zookeeper   zookeeper-3.4.12 使用单独的zookeeper,不使用内嵌的zookeeper

kafka    kafka_2.12-2.0.0

操作过程:

解压 tar -xvf  zookeeper-3.4.12.jar

启动 zkServer.sh start    默认使用的conf/zoo.cfg 这个配置文件

查看是否启动成功  ps -ef |grep zookeeper  zookeeper已启动成功

启动第一个kafka server

解压 tar -xvf  kafka_2.12-2.0.0.jar

编辑配置文件 将 config目录下的server.properties重命名为server1.properties

具体配置如下 ,主要的配置就是监听的端口和日志的目录,由于是虚拟机中,就明确指定虚拟机host-only的ip地址,这里为

192.168.100.209 

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.100.209:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.100.209:9092
host.name=192.168.100.209
port=9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/opt/kafka/log1

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000


############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances duri
ng application startup.
group.initial.rebalance.delay.ms=0

启动kafka   bin/kafka-server-start.sh config/server1.properties

查看启动日志,启动成功

对单个的kafka进行测试

使用linux命令创建toptic并添加消息 

 bin/kafka-console-producer.sh --broker-list 192.168.100.209:9092 --topic my-replicated-topic

>my test message 1

>my test message 2

再利用windows下的可视化工具 kafka tools 2.0进行查看 ,至此kafka单个基于zookeeper的环境就搭建成功了

 

依次启动其它两个kafka server 

 bin/kafka-server-start.sh config/server2.properties

 bin/kafka-server-start.sh config/server3.properties

三个节点已启动完成

4.spring整合kafka client进行生产和消费 

注意kafka server版本 与kafka client 版本的对应关系

本文中kafka的版本是 kafka_2.12-2.0.0

springBoot 的版本是2.0.7RELEASE 对应依赖的kafka clients 1.0.2  版本是符合的

参考spring kafka 官方文档  https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/_reference.html

spring  整合kafka有多种方式,这里只介绍springBoot整合的方式

关键是kafkaTemplate 和 @KafkaListener   sending message 和 recieving message

1)首先新建几个topic


	//kafkaAdmin与Producer和consumer不一样,可用来创建topic ,这里设置   kafkaAdmin.setFatalIfBrokerNotAvailable(true);
	//就意味着上下文启动的时候,admin就会创建NewTopic的topic,如果创建失败上下文启动失败,默认的设置为flase,即不会启动失败
	@Bean
	public KafkaAdmin admin() {
	    Map<String, Object> configs = new HashMap<>();
	    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
	    		"192.168.100.209:9092,192.168.100.209:9093,192.168.100.209:9094" );
	    KafkaAdmin kafkaAdmin = new KafkaAdmin(configs);
	    kafkaAdmin.setFatalIfBrokerNotAvailable(true);
	    return kafkaAdmin;
	}

	@Bean
	public NewTopic topic1() {
	    return new NewTopic("foo", 10, (short) 2);
	}

	@Bean
	public NewTopic topic2() {
	    return new NewTopic("bar", 10, (short) 2);
	}

启动项目成功后,成功的创建了topic

2.发送消息,Producer的配置

    @Bean
	public ProducerFactory<Integer, String> producerFactory() {
	    return new DefaultKafkaProducerFactory<>(producerConfigs());
	}

	@Bean
	public Map<String, Object> producerConfigs() {
	    Map<String, Object> props = new HashMap<>();
	    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.209:9092,192.168.100.209:9093,192.168.100.209:9094");
	    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
	    return props;
	}

	@Bean
	public KafkaTemplate<Integer, String> kafkaTemplate() {
	    return new KafkaTemplate<Integer, String>(producerFactory());
	}

发送消息如下,注意这里的app_log的topic在kafka里原来是不存在的,即发不存在的topic会创建,创建的topic app_log的partition是为2,replicas为1,这两个参数怎么来的,是不是和kafkaTemplate的配置有关,这里还不是很清楚。不过生产应用

时都是先创建topic,再向topic中生产消息和消费消息,另外  kafka 0.11版本后支持事物,即多个消息要么都发送成功,要么都发送失败,这里不演示


	@Autowired
	private KafkaTemplate kafkaTemplate;

	public void send(String message) {
		ListenableFuture future = kafkaTemplate.send("app_log", message);
		future.addCallback(o -> System.out.println("send-消息发送成功:" + message),
				throwable -> System.out.println("消息发送失败:" + message));
	}

这里说说,kafkaTemplate.send(topic,key,data),这里的key是保证每次发送到同一partition里,就可以保证整体有序。

当然也可以直接指定 partition 的id  kafkaTemplate.send(topic,partitionId,key,data)

kafkaTemplate.send(topic,partitionId,timestamp,key,data) 关于kafka记录中timestamp的意义,其中一条作用就是定位消息,因为可能不记得消息的offset

参见  https://www.cnblogs.com/huxi2b/p/6050778.html 

ProducerRecord:增加了timestamp字段,允许producer指定消息的时间戳,如果不指定的话使用producer客户端的当前时间

3.Recieving message

Messages can be received by configuring a MessageListenerContainer and providing a Message Listener, or by using the @KafkaListener annotation.  

这里演示 @KafkaListener的方式进行消费,先看下配置

@Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.209:9092,192.168.100.209:9093,192.168.100.209:9094");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // See https://kafka.apache.org/documentation/#consumerconfigs for more properties
	    //org.apache.kafka.common.serialization.Deserializer
        return props;
    }

消费代码

  

       //@KafkaListener 默认是从当前时间开始消费
	   @KafkaListener(id = "foo", topics = "foo")
	    public void listen(String data) {
	       System.out.println("接收到消息" + data);
	    }
	    
	   @KafkaListener(id = "myContainer2",topics = "app_log")
	    public void listen2(ConsumerRecord<?, ?> record){
	        System.out.println("topic:" + record.topic());
	        System.out.println("key:" + record.key());
	        System.out.println("value:"+record.value());
	    }

比较复杂的@KafkaListener 的配置,指定topic,partitions,initialOffest等

@KafkaListener(id = "myContainer2",topicPartitions = { @TopicPartition(topic = "foo", partitions = { "0", "3" }),
               @TopicPartition(topic = "app_log", partitions = "0",
               partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))})

更多关于consumer的配置详见spring文档,比如offset手动提交等配置,至此kafka已可以基本使用,项目代码地址

https://gitee.com/dahaizhenqiang/app_data.git

 

 

 

 

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_42739473/article/details/87688215
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-02-25 01:03:17
  • 阅读 ( 1402 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢