社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
Kafka是一种高吞吐量的分布式发布订阅消息系统,是观察者模式的一种实现。
Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
kafka/config/server.properties配置文件:
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
可以理解为一个队列。每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。
Partition是物理上的概念,每个Topic包含一个或多个Partition.
为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序
负责发布消息到Kafka broker
消息消费者,向Kafka broker读取消息的客户端。
Kafak中同一消费者组不能同时消费一个分区,只能去消费不同分区,因此在生产环境中,消费者consumer的数量和分区数保持一致即可。
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。
消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成
每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。
1)分区的原因
(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以Partition为单位读写了。
2)分区的原则
(1)指定了patition,则直接使用;
(2)未指定patition但指定key,通过对key的value进行hash出一个patition;
(3)patition和key都未指定,使用轮询选出一个patition。
同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。
1)producer先从zookeeper的 "/brokers/…/state"节点找到该partition的leader
2)producer将消息发送给该leader
3)leader将消息写入本地log
4)followers从leader pull消息,写入本地log后向leader发送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK。
ACK可以设置 0 1 all等。0代表发送出去不接受确认信息,1代表只要leader回复确认信息即可,all代表kafka集群必须全部回复确认信息。
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件)
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
属性 | 默认值 | 描述 |
---|---|---|
broker.id | 必填参数,broker的唯一标识 | |
log.dirs | /tmp/kafka-logs | Kafka数据存放的目录。可以指定多个目录,中间用逗号分隔,当新partition被创建的时会被存放到当前存放partition最少的目录。 |
port | 9092 | BrokerServer接受客户端连接的端口号 |
zookeeper.connect | null | Zookeeper的连接串,格式为:hostname1:port1,hostname2:port2,hostname3:port3。可以填一个或多个,为了提高可靠性,建议都填上。注意,此配置允许我们指定一个zookeeper路径来存放此kafka集群的所有数据,为了与其他应用集群区分开,建议在此配置中指定本集群存放目录,格式为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消费者的参数要和此参数一致。 |
message.max.bytes | 1000000 | 服务器可以接收到的最大的消息大小。注意此参数要和consumer的maximum.message.size大小一致,否则会因为生产者生产的消息太大导致消费者无法消费。 |
num.io.threads | 8 | 服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量。 |
queued.max.requests | 500 | I/O线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求。 |
socket.send.buffer.bytes | 100 * 1024 | The SO_SNDBUFF buffer the server prefers for socket connections. |
socket.receive.buffer.bytes | 100 * 1024 | The SO_RCVBUFF buffer the server prefers for socket connections. |
socket.request.max.bytes | 100 * 1024 * 1024 | 服务器允许请求的最大值, 用来防止内存溢出,其值应该小于 Java heap size. |
num.partitions | 1 | 默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值,建议改为5 |
log.segment.bytes | 1024 * 1024 * 1024 | Segment文件的大小,超过此值将会自动新建一个segment,此值可以被topic级别的参数覆盖。 |
log.roll.{ms,hours} | 24 * 7 hours | 新建segment文件的时间,此值可以被topic级别的参数覆盖。 |
log.retention.{ms,minutes,hours} | 7 days | Kafka segment log的保存周期,保存周期超过此时间日志就会被删除。此参数可以被topic级别参数覆盖。数据量大时,建议减小此值。 |
log.retention.bytes | -1 | 每个partition的最大容量,若数据量超过此值,partition数据将会被删除。注意这个参数控制的是每个partition而不是topic。此参数可以被log级别参数覆盖。 |
log.retention.check.interval.ms | 5 minutes | 删除策略的检查周期 |
auto.create.topics.enable | true | 自动创建topic参数,建议此值设置为false,严格控制topic管理,防止生产者错写topic。 |
default.replication.factor | 1 | 默认副本数量,建议改为2。 |
replica.lag.time.max.ms | 10000 | 在此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-sync replicas)中移除。 |
replica.lag.max.messages | 4000 | 如果replica节点落后leader节点此值大小的消息数量,leader节点就会将其从ISR中移除。 |
replica.socket.timeout.ms | 30 * 1000 | replica向leader发送请求的超时时间。 |
replica.socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests to the leader for replicating data. |
replica.fetch.max.bytes | 1024 * 1024 | The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader. |
replica.fetch.wait.max.ms | 500 | The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader. |
num.replica.fetchers | 1 | Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker. |
fetch.purgatory.purge.interval.requests | 1000 | The purge interval (in number of requests) of the fetch request purgatory. |
zookeeper.session.timeout.ms | 6000 | ZooKeeper session 超时时间。如果在此时间内server没有向zookeeper发送心跳,zookeeper就会认为此节点已挂掉。 此值太低导致节点容易被标记死亡;若太高,.会导致太迟发现节点死亡。 |
zookeeper.connection.timeout.ms | 6000 | 客户端连接zookeeper的超时时间。 |
zookeeper.sync.time.ms | 2000 | H ZK follower落后 ZK leader的时间。 |
controlled.shutdown.enable | true | 允许broker shutdown。如果启用,broker在关闭自己之前会把它上面的所有leaders转移到其它brokers上,建议启用,增加集群稳定性。 |
auto.leader.rebalance.enable | true | If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available. |
leader.imbalance.per.broker.percentage | 10 | The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker. |
leader.imbalance.check.interval.seconds | 300 | The frequency with which to check for leader imbalance. |
offset.metadata.max.bytes | 4096 | The maximum amount of metadata to allow clients to save with their offsets. |
connections.max.idle.ms | 600000 | Idle connections timeout: the server socket processor threads close the connections that idle more than this. |
num.recovery.threads.per.data.dir | 1 | The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. |
unclean.leader.election.enable | true | Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss. |
delete.topic.enable | false | 启用deletetopic参数,建议设置为true。 |
offsets.topic.num.partitions | 50 | The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
offsets.topic.retention.minutes | 1440 | Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic. |
offsets.retention.check.interval.ms | 600000 | The frequency at which the offset manager checks for stale offsets. |
offsets.topic.replication.factor | 3 | The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas. |
offsets.topic.segment.bytes | 104857600 | Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads. |
offsets.load.buffer.size | 5242880 | An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache. |
offsets.commit.required.acks | -1 | The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden. |
offsets.commit.timeout.ms | 5000 | The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout. |
kafka提供了两套consumer API:高级Consumer API和低级Consumer API。
1)高级API优点
高级API 写起来简单
不需要自行去管理offset,系统通过zookeeper自行管理。
不需要管理分区,副本等情况,系统自动管理。
消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset)
可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
2)高级API缺点
不能自行控制offset(对于某些特殊需求来说)
不能细化控制如分区、副本、zk等
1)低级 API 优点
能够让开发者自己控制offset,想从哪里读取就从哪里读取。
自行控制连接分区,对分区自定义进行负载均衡
对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)
2)低级API缺点
太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。
可以使用spring的kafka,依赖如下:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
原生用法如下。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
spring:
kafka:
properties:
max.request.size: 10097152
zookeeper.connection.timeout.ms: 100000
consumer:
bootstrap-servers: 10.10.10.10:9092
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
max.request.size: 10097152
max.partition.fetch.bytes: 10097152
group-id: demo-id
@Configuration
public class KafkaConfig {
@Bean
public Properties getProperties(){
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.91.159.28:9092,10.91.159.29:9092,10.91.159.30:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("lingers.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
@Bean
public KafkaProducer<String, String> kafkaProducer(@Autowired @Qualifier("getProperties") Properties kafkaProperties) {
return new KafkaProducer<String, String>(kafkaProperties);
}
}
@Autowired
private Producer<String, String> producer;
producer.send(new ProducerRecord<String, String>("luowenhui", JSONObject.toJSONString(userinfo)));
注意:给kafka里送的消息,一般一条条的发送数据,这样可以轮询分发给不同的consumer,提高性能。对于有关联的数据可以选择批量发送。
@Configuration
public class KafkaConfig {
@Bean
public Properties kafkaProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.91.159.28:9092,10.91.159.29:9092,10.91.159.30:9092");
properties.put("group.id", "hello");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return properties;
}
@Bean
public KafkaConsumer<String, String> kafkaConsumer0(@Autowired @Qualifier("kafkaProperties") Properties kafkaProperties) {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(kafkaProperties);
kafkaConsumer.assign(Arrays.asList(new TopicPartition("luo", 0)));//按区接收
//消费者订阅的topic, 可同时订阅多个
//consumer.subscribe(Arrays.asList("first", "second","third"));
return kafkaConsumer;
}
}
assign方法和subscribe区别:
Kafka里的消费者组有两个使用的场景:
while (true) {
ConsumerRecords<String, String> records = consumer0.poll(100);
for (ConsumerRecord<String, String> record : records) {//record.value()就是获取到的数据
}
}
Ctrl+C停止命令
功能 | 命令 | 备注 |
---|---|---|
进入broker配置 | [root@node1 kafka_2.11-1.1.1]# vi config/server.properties | |
启动kafka | [atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties | |
关闭集群 | [atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop | |
查看当前服务器中的所有topic | [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list | |
创建topic | [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first | –topic 定义topic名;–replication-factor 定义副本数(副本数不能大于kafka的集群节点数,否则会报错,HDFS不会报错,因为它定义的副本数是最大副本数,假如是10,当只有3台hdfs节点时,副本数其实是3,假如加一台新的节点,副本数变为4,当加到11是,副本数最大为10);–partitions 定义分区数 |
删除topic | [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first | 需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启 |
发送消息 | [atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first | |
消费消息 | kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic first | |
查看某个Topic的详情 | [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first | Leader:Leader的主机号 broker-id;Replicas:副本所在的主机号 0 1 2 代表三个主机;Isr:选举使用,并且是按数据性差异排序的,与leader差异性最小,排名越靠前,刚开始的leader是随机选取的 |
查看存在的topic | bin/kafka-topics.sh --zookeeper hadoop1:2181 --list | _consumer_offsets的topic里面存储着offset,kafka默认分为50个分区,分布再不同的主机上。每存入一次数据,_consumer_offsets+1 |
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!