kafka生产者和消费者(java示例) - Go语言中文社区

kafka生产者和消费者(java示例)


kafka笔记1--集群搭建:https://blog.csdn.net/u010452388/article/details/84674454

本文主要讲解kafka生产者和消费者的API使用,以及部分配置说明

目录

一 引入依赖

二 生产者

2.1 代码

2.2 生产者配置说明

2.3 结果-生产者

三 消费者

3.1 代码

3.2 消费者配置说明

3.3 结果-消费者


一 引入依赖

这里引入客户端依赖的时候尽量保持与服务端版本一致,不然会出现奇怪的错误,看下服务端版本

从上面的图可以看出,服务端版本为1.1.0版本,前面的2.12是Scala版本,所以客户端引入下面的依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

下图为生产者和消费者的工程结构图

二 生产者

2.1 代码

这里的生产者采用的是每隔1秒钟发送一条消息,总共发送19条

public class KafkaProducerDemo {

    public static Properties kafkaProperties() {

        Properties properties = new Properties();

        /*设置集群kafka的ip地址和端口号*/
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.2.231:9092,192.168.2.232:9092");
        /*发送的消息需要leader确认*/
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        /*用户id*/
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
        /*对key进行序列化*/
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerSerializer");
        /*对value进行序列化*/
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        return properties;
    }

    public static void main(String[] args) throws InterruptedException {
        /*创建一个kafka生产者*/
        KafkaProducer<Integer, String> kafkaProducer =
                new KafkaProducer<Integer, String>(kafkaProperties());
        /*主题*/
        String topic = "test";
        /*循环发送数据*/
        for (int i = 0; i < 20; i++) {
            /*发送的消息*/
            String message = "我是一条信息" + i;
            /*发出消息*/
            kafkaProducer.send(new ProducerRecord<>(topic, message));
            System.out.println(message + "->已发送");
            Thread.sleep(1000);
        }

    }
}

2.2 配置-生产者

官网生产者配置详情:http://kafka.apache.org/11/documentation.html#producerconfigs

下面是根据官网的配置进行翻译的,如果有翻译的不对的,可以留言沟通

属性名 描述 类型 默认值 有效值 重要程度
key.serializer

为key序列化的类,这个类要实现org.apache.kafka.common.serialization.Serializer

这个接口 

class    
value.serializer

为value序列化的类,这个类要实现org.apache.kafka.common.serialization.Serializer

这个接口

class    
acks

在请求完成之前,生产者要求kafka集群的leader已经接收的确认数,这个可以控制发送记录的持久化能力(我们可以认为持久能力越高,也就是数据丢失率就越低)

  • acks=0 ,生产者不会等待服务器的任何确认,记录将会立即添加到套接字缓存以及等待发送,在这种情况下,服务器不能保证会接收到记录,以及retries配置将不会生效(因为客户端不想知道任何失败的情况)。每条记录返回的偏移量始终为-1,
    结:性能高,但是会数据丢失
  • acks=1,集群中的leader会把记录写到本地日志中,然后响应返回,并不会等待所有followers确认。在这种情况下,leader确认过记录之后突然宕机了,所有的followers还没来得及复制记录,这个时候,记录将会丢失
    总结:只需要集群中的leader确认,但是也有可能会数据丢失
  • acks=all,集群中的leader将会等待所有同步副本集去确认这个记录,只要保证至少有一个同步副本还存活着,记录就不会丢失。这是最有效的保证。设置acks=-1的话,会有同样的效果
    总结:需要leader和所有副本集去确认,性能会降低,但是数据最安全
string 1 [all,-1,0,1]
bootstrap.servers

主机IP/端口成对的一组列表,用来与kafka集群建立初始化连接。这个列表的的形式为:host1:port1,host2:port2,...。因为这些服务端列表只是用来初始化连接并用来发现集群的成员的(可以动态的改变),所以这些列表不是必须包含所有服务器的集合(你应该超过1个,因为,一个服务器有可能宕机)

list ""  
buffer.memory

总的缓存大小(单位:字节),生产者用来缓存发送的记录,此记录会等待被发送到服务端。如果生产的记录速度超过发送到服务端的速度,生产者可能会阻塞,阻塞的时间超过max.block.ms配置的值的话,就会抛出异常。

这个设置应该大致的符合生产者需要使用的总缓存空间,但不是一个硬性限制,因为不是所有的缓存空间都是给生产者用作缓存。一些额外的空间既要被用作压缩(如果compression是enabled的话)也要维持快速的请求。

long 33554432 [0,...]
compression.type

为生产者数据提供的压缩类型。默认是none(也就是没有压缩)。有效值为none,gzip,snappy,or lz4.压缩是针对数据的批量压缩的,所以批量数据的效率将会影响压缩的效率(批量数据越多意味着越好的压缩)

string none  
retries

设置为大于0的值的时候,如果客户端发送任何记录遇到临时的错误的话,客户端会重新发送。没有将max.in.flight.requests.per.connection配置成1的话重新发送可能潜在的改变记录的发送顺序,因为如果两批记录发送到单一的分区(partition),第一批失败了,正在重新发送,但是第二批成功了,那么第二批的记录有可能出现在前面

int 0 [0,...,2147483647]
ssl.key.password

key存储文件的私钥密码。对于客户端来说,可选参数

password null  

ssl.keystore.loca

tion

key存储文件的位置。对于客户端来说,可选参数,以及可以被用作对客户端的双向认证

string null  

ssl.keystore.pass

word

key存储文件的存储密码。对客户端来说,可选参数,只有ssl.keystore.location配置了才需要此参数

password null  

ssl.truststore.loc

ation

信任存储文件的位置

string null  

ssl.truststore.pass

word

信任存储文件的密码,如果一个密码没有被设置可访问信任存储,这个密码也是有效的,但是完整性的检查是无效的

password null  
batch.size

每当有多个记录要发送到同样的分区的时候,生产者将尝试将记录批处理到一起以至于减少请求。这有助于客户端和服务端的性能。这个配置默认单位是字节。

记录大于这个大小的话,不会尝试去批处理。

小批量处理不怎么常见,并有可能减少吞吐量(批处理大小为0将禁止使用批处理)。

一个非常大的批量大小会使用很多内存,会造成浪费,在预计额外记录的情况下,因为我们总是分配指定的缓冲大小

int 16384 [0,...]
           
           


1.acks(默认为1)
    "0":消息发送给broker以后,不需要确认(性能高,但是会出现数据丢失)
    "1":只需要获得kafka集群中leader节点的确认即可返回(leader/follower)
    all或者"-1" 需要ISR中的所有Replica(副本)进行确认(需要集群中所有节点确认)

2.batch.size(默认16KB) 调优的重要参数
    producer对于同一个分区来说,会按照batch.size的大小进行统一收集,然后批量发送
    就是说我们如果发送的消息,不会直接发出去,等达到batch.size之后,再发出去

3.linger.ms(默认0毫秒)
    延迟发送,假如设置的值是1000的话,就是每隔1秒钟积累之前的信息,然后再发送
    
4.max.request.size(默认1M)
    消息最大发送的字节数,超过默认值1M的话,就会拒绝,抛异常

 

2.3 结果展示

发送的结果如下图:

 

三 消费者

3.1 代码

public class KafkaConsumerDemo extends Thread {

    private final KafkaConsumer<Integer,String> consumer;

    public KafkaConsumerDemo(String topic) {
        Properties props = new Properties();
        /*设置集群kafka的ip地址和端口号*/
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.2.231:9092,192.168.2.232:9092");
        /*设置消费者的group.id*/
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo1");
        /*消费信息以后自动提交*/
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        /*控制消费者提交的频率*/
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        /*key反序列化*/
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        /*value反序列化*/
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        /*创建kafka消费者对象*/
        consumer = new KafkaConsumer<Integer, String>(props);
        /*订阅主题*/
        consumer.subscribe(Collections.singleton(topic));

    }

    @Override
    public void run() {
        while (true) {
            /*每隔一段时间到服务器拉取数据*/
            ConsumerRecords<Integer, String> consumerRecords = consumer.poll(1000);
            for (ConsumerRecord record : consumerRecords) {
                System.out.println(record.value());
            }
        }
    }
    public static void main(String[] args) {
        new KafkaConsumerDemo("test").start();
    }
}

3.2 配置说明

1.group.id
    同一个gropu.id中的消费者,只能有一个消费者可以消费到信息
    但是不同的group.id都会去消费消息(消息是持久化的)
    
2.enable.auto.commit 
    如果位true的话,消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到

3.auto.commit.interval.ms
    控制消费者提交的频率,默认单位是毫秒,一般配合enable.auto.commit

4.auto.offset.reset
    这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来
    消费指定的 topic 时,对于该参数的配置,会有不同的语义

    auto.offset.reset=latest 情况下,新的消费者将会从其他消费者最后消费的offset 处开始消费 Topic 下的消息
    auto.offset.reset= earliest 情况下,新的消费者会从该 topic 最早的消息开始消费
    auto.offset.reset=none 情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。
    
5.max.poll.records
    设置限制每次消费者poll返回的消息数,通过调整此值,可以减少poll的间隔

官网消费者配置详情:http://kafka.apache.org/11/documentation.html#consumerconfigs

3.3 结果展示

消费的结果如下图:

 

 

 

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/u010452388/article/details/84715684
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-01 19:48:03
  • 阅读 ( 1236 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢