Kafka 2.0的简单Producer和Consumer实现 - Go语言中文社区

Kafka 2.0的简单Producer和Consumer实现


系统环境

在kafka单节点运行环境下,尝试使用java创建Kafka的Producer和Consumer进行测试,具体的代码环境如下:

  • OS:Ubuntu 16.4
  • Kafka:2.11_2.0.0
  • Zookeeper:使用Kafka中自带的Zookeeper进行启动
  • JDK: 1.8

项目使用maven,其中pom.xml的相关内容如下:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.0.0</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <!--<scope>test</scope>-->
</dependency>

简单Producer的实现

简单Producer的代码如下:

package kafka;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class TestProducer {
 
    private static Properties kafkaProps;
 
    private static void initKafka() {
        kafkaProps = new Properties();
        // broker url
        kafkaProps.put("bootstrap.servers", "localhost:9092"); //,192.168.216.139:9092,192.168.216.140:9092
        // request need to validate
        kafkaProps.put("acks", "all");
        // request failed to try
        kafkaProps.put("retries", 0);
        // memory cache size
        kafkaProps.put("batch.size", 16384);
        //
        kafkaProps.put("linger.ms", 1);
        kafkaProps.put("buffer.memory", 33554432);
        // define the way of key and value serializer
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
    }
 
    public static void main(String[] args) {
        initKafka();
        Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        }
        System.out.println("Message sent successfully!");
        producer.close();
    }
}

代码中需要注意的是:bootstrap.servers的配置项,在默认kafka的单节点配置时,不能使用IP,而是使用localhost进行连接,否则会连接异常。

此处对代码中用到的几个参数进行解释:

  • bootstrap.servers:用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2;
  • acks:生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。
  • retries:生产者发送失败后,重试的次数 batch.size:当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。
  • linger.ms:默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。
  • batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。
  • buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。
  • key.serializer,value.serializer说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

简单Consumer的实现

简单Consumer的代码如下:

package kafka;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import java.util.Collections;
import java.util.Properties;
 
public class TestConsumer {
 
    private static Properties kafkaProps = new Properties();
 
    private static void kafkaInit() {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        // group id for each consumer
        kafkaProps.put("group.id", "test");
        // if value legal, auto add offset
        kafkaProps.put("enable.auto.commit", "true");
        // set how long time to udpate the offset value
        kafkaProps.put("auto.commit.interval.ms", "1000");
        // set session response time
        kafkaProps.put("session.timeout.ms", "30000");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }
 
    public static void main(String[] args) {
        kafkaInit();
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(kafkaProps);
        kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
        System.out.println("Subscribed to topic:" + "my-topic");
 
        int i = 0;
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100); // ?
            for (ConsumerRecord<String, String> record : records) {
                // print the offset, key and value for the consumer records
                System.out.printf("Offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value());
            }
        }
    }
}

运行情况

Producer的成功运行后,部分输出如下:

可以看到,在producer的initKafka的相关配置项的值出现在ProducerConfig values中。

image

Consumer成功运行后,可以看到在producer中send的相关key和value值,在consumer的输出中出现:

image

遇到的问题

在producer运行时,出现如下错误:

image

在提示的参考URL页面中,可以找到相关问题的说明:

image

具体的解决方法为,修改pom.xml文件:在pom.xml文件中加入slf4j的相关引用,并将slf4j-log4j12引用中:

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <!--<scope>test</scope>-->
</dependency>

修改完成后,重新运行producer程序,可以正常运行。

资料参考

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/yitian_z/article/details/87909862
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-06 23:11:58
  • 阅读 ( 998 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

推荐文章

猜你喜欢