【大数据笔记05】Kafka - Go语言中文社区

【大数据笔记05】Kafka


Apache Kafka

在这里插入图片描述

Kafka核心组件

  • Producer :消息生产者,就是向 kafka broker 发消息的客户端。
  • Consumer :消息消费者,向 kafka broker 取消息的客户端
  • Topic :名称。
  • Consumer Group (CG):这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG,但每个 partion 只会把消息发给该 CG 中的一个consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic。
  • Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  • Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给 consumer,不保证一个 topic 的整体(多个 partition 间)的顺序。
  • Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是00000000000.kafka
  • Replication:Kafka 支持以 Partition 为单位对 Message 进行冗余备份,每个 Partition 都可以配置至少 1 个 Replication(当仅 1 个 Replication 时即仅该 Partition 本身)。
  • Leader:每个 Replication 集合中的 Partition 都会选出一个唯一的 Leader,所有的读写请求都由Leader 处理。其他 Replicas 从 Leader 处把数据更新同步到本地,过程类似大家熟悉的 MySQL中的 Binlog 同步。每个 Cluster 当中会选举出一个 Broker 来担任 Controller,负责处理 Partition的 Leader 选举,协调 Partition 迁移等工作。
  • ISR(In-Sync Replica):是 Replicas 的一个子集,表示目前 Alive 且与 Leader 能够“Catch-up”的Replicas 集合。 由于读写都是首先落到 Leader 上,所以一般来说通过同步机制从 Leader 上拉取数据的 Replica 都会和 Leader 有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该 Replica 踢出 ISR。每个 Partition 都有它自己独立的 ISR。

Kafka集群部署

  1. 下载安装包:wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz

  2. 解压,修改目录名:
    tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
    cd /export/servers/
    mv kafka_2.11-1.0.0 kafka

  3. 修改配置文件
    cp /export/servers/kafka/config/server.properties /export/servers/kafka/config/server.properties.bak
    vi /export/servers/kafka/config/server.properties

    #broker 的全局唯一编号,不能重复
    broker.id=0
    #用来监听链接的端口,producer 或 consumer 将在此端口建立连接
    port=9092
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接受套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志存放的路径
    log.dirs=/export/servers/logs/kafka
    #topic 在当前 broker 上的分片个数
    num.partitions=2
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #滚动生成新的 segment 文件的最大时间
    log.roll.hours=168
    #日志文件中每个 segment 的大小,默认为 1G
    log.segment.bytes=1073741824
    #周期性检查文件大小的时间
    log.retention.check.interval.ms=300000
    #日志清理是否打开
    log.cleaner.enable=true
    #broker 需要使用 zookeeper 保存 meta 数据
    zookeeper.connect=192.168.52.106:2181,192.168.52.107:2181,192.168.52.108:2181
    #zookeeper 链接超时时间
    zookeeper.connection.timeout.ms=6000
    #partion buffer 中,消息的条数达到阈值,将触发 flush 到磁盘
    log.flush.interval.messages=10000
    #消息 buffer 的时间,达到阈值,将触发 flush 到磁盘
    log.flush.interval.ms=3000
    #删除 topic 需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除
    delete.topic.enable=true
    #此处的
    host.name 为本机 IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
    host.name=kafka01

    1. 分发安装包
    2. 依次修改各broker(各个服务器)的broker.id不得重复
    3. 启动集群
      nohup /export/servers/kafka/bin/kafka-server-start.sh /export/servers/kafka/config/server.properties >/dev/null 2>&1 &
      输出错误日志到黑洞(关于“黑洞”的说法,见:https://blog.csdn.net/loongshawn/article/details/50514018
      command >/dev/null 2>&1 &

底层原理

Consumer和Topic的关系
本质上 kafka 只支持 Topic;

  • 每个 group 中可以有多个 consumer,每个 consumer 属于一个 consumer group;通常情况下,一个 group 中会包含多个 consumer,这样不仅可以提高 topic 中消息的并发消费能力,而且还能提高"故障容错"性,如果 group 中的某个 consumer 失效那么其消费的 partitions 将会有其他 consumer 自动接管。
  • 对于 Topic 中的一条特定的消息,只会被订阅此 Topic 的每个 group 中的其中一个 consumer 消费,此消息不会发送给一个 group 的多个 consumer;那么一个 group 中所有的 consumer 将会交错的消费整个 Topic,每个 group 中 consumer 消息消费互相独立,我们可以认为一个 group 是一个"订阅"者。
  • 在 kafka 中,一个 partition 中的消息只会被 group 中的一个 consumer 消费( 同一时刻);一个 Topic 中的每个 partions,只会被一个"订阅者"中的一个 consumer 消费,不过一个 consumer 可以同时消费多个 partitions 中的消息。
  • kafka 的设计原理决定,对于一个 topic,同一个 group 中不能有多于 partitions 个数的 consumer 同时消费,否则将意味着某些 consumer 将无法得到消息。kafka 只能保证一个 partition 中的消息被某个 consumer 消费时是顺序的;事实上,从 Topic角度来说, 当有多个 partitions 时,消息仍不是全局有序的。

Kafka消息的分发
Producer 客户端负责消息的分发

  • kafka 集群中的任何一个 broker 都可以向 producer 提供 metadata 信息,这些 metadata 中包含"集群中存活的 servers 列表"/"partitions leader 列表"等信息;
  • 当 producer 获取到 metadata 信息之后, producer 将会和 Topic 下所有 partition leader 保持socket 连接;
  • 消息由 producer 直接通过 socket 发送到 broker,中间不会经过任何"路由层",事实上,消息被路由到哪个 partition 上由 producer 客户端决定;比如可以采用"random"“key-hash”“轮询"等, 如果一个 topic 中有多个 partitions, 那么 在producer 端实现” 消息均衡分发" 是必要的。
  • 在 producer 端的配置文件中,开发者可以指定 partition 路由的方式。Producer 消息发送的应答机制
    设置发送数据是否需要服务端的反馈,有三个值 0,1,-1
    0: producer 不会等待 broker 发送 ack
    1: 当 leader 接收到消息之后发送 ack
    -1: 当所有的 follower 都同步消息成功后发送 ack
    request.required.acks=0

Consumer的负载均衡
在这里插入图片描述
当一个 group 中,有 consumer 加入或者离开时,会触发 partitions 均衡.均衡的最终目的,是提升 topic 的并发消费能力,步骤如下:
1、 假如 topic1,具有如下 partitions: P0,P1,P2,P3
2、 加入 group 中,有如下 consumer: C0,C1
3、 首先根据 partition 索引号对 partitions 排序: P0,P1,P2,P3
4、 根据
consumer.id 排序: C0,C1
5、 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值 M=2(向上取整)
6、 然后依次分配 partitions: C0 = [P0,P1],C1=[P2,P3],即 Ci = [P(i * M),P((i + 1) * M -1)]
在这里插入图片描述

Kafka的文件存储机制(重点:分片-副本机制)
在这里插入图片描述

  • 在Kafka文件存储中,同一个 topic下有多个不同partition,每个 partition 为一个目录,partiton命名规则为 topic 名称+有序序号,第一个 partiton 序号从 0 开始,序号最大值为 partitions数量减 1。
  • 每个 partion(目录)相当于一个巨型文件被平均分配到多个大小相等 segment(段)数据文件中。 但每个段 segment file 消息数量不一定相等,这种特性方便 old segment file 快速被删除。默认保留 7 天的数据。
  • 每个 partiton 只需要支持顺序读写就行了,segment 文件生命周期由服务端配置参数决定。(什么时候创建,什么时候删除)
    在这里插入图片描述
  • Kafka Partition Segment
    • Segment file 组成:由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀".index"和“.log”分别表示为 segment 索引文件、数据文件。
      在这里插入图片描述
    • Segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个 segment文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
    • 索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中 message 的物理偏移地址。
      在这里插入图片描述

消息不丢失机制

  • 生产者端消息不丢失
    在这里插入图片描述
  1. 消息生产分为同步模式和异步模式
  2. 消息确认分为三个状态
    a) 0:生产者只负责发送数据
    b) 1:某个partition的leader收到数据给出响应
    c) -1:某个partition的所有副本都收到数据后给出响应
  3. 在同步模式下
    a) 生产者等待10S,如果broker没有给出ack响应,就认为失败。
    b) 生产者重试3次,如果还没有响应,就报错。
  4. 在异步模式下
    a) 先将数据保存在生产者端的buffer中。Buffer大小是2万条。
    b) 满足数据阈值或者数量阈值其中的一个条件就可以发送数据。
    c) 发送一批数据的大小是500条。
    如果broker迟迟不给ack,而buffer又满了。
    开发者可以设置是否直接清空buffer中的数据。
  • Broker端消息不丢失
    broker端的消息不丢失,其实就是用partition副本机制来保证
    Producer ack -1. 能够保证所有的副本都同步好了数据。其中一台机器挂了,并不影像数据的完整性。

  • 消费者端消息不丢失
    在这里插入图片描述
    只要记录offset值,消费者端不会存在消息不丢失的可能。只会重复消费。

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_43837588/article/details/88744251
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-04-19 18:02:14
  • 阅读 ( 1523 )
  • 分类:大数据

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢