kafka 重复数据读取原因 - Go语言中文社区

kafka 重复数据读取原因


       之前一直遇到kafka数据读取重复的问题,但都通过一些方式去避免了,今天专门去探究了下原因。出现这个问题,一般都是设置kafkaoffset自动提交的时候发生的。原因在于数据处理时间大于max.poll.interval.ms(默认300s),导致offset自动提交失败,以致offset没有提交。重新读取数据的时候又会读取到kafka之前消费但没有提交offset的数据,从而导致读取到重复数据。具体错误如下:

[WARN]2018-08-07 16:31:51,502 method:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. maybeAutoCommitOffsetsSync (ConsumerCoordinator.java:664)
[Consumer clientId=consumer-1, groupId=first] Synchronous auto-commit of offsets {autoCommit3-0=OffsetAndMetadata{offset=44 , metadata=''}, autoCommit3-1=OffsetAndMetadata{offset=60, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

原因探究       

仔细想想不应该啊,自动提交offset和数据处理时间难道还有关系?为此,我专门阅读了kafka的某些源码。我们使用的Kafka的api,

调用的是KafkaConsumer的poll方法:

   给方法调用了pollOnce方法:

    该方法有调用了ConsumerCoordinator 的poll方法:

该方法的最后调用了自动offset同步的方法:

关键就在这个方法,这个方法只有在poll的时候才会调用,如果,数据处理时间操过poll的最大时间,就会导致本文开始的错误,而不能提交offset.

 

解决方案:

  1. 增大max.poll.interval.ms,默认是300s,不推荐
  2. 减小max.poll.records,推荐
  3. 使用手动提交offset方法consumer.commitSync()/consumer.commitAsync(),在不确定数据情况下推荐
  4. 一个线程消费kafka数据,消费后将数据传给其他线程处理
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/llflilongfei/article/details/81483509
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-01 19:50:33
  • 阅读 ( 1160 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

推荐文章

猜你喜欢