社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
在kafka单节点运行环境下,尝试使用java创建Kafka的Producer和Consumer进行测试,具体的代码环境如下:
项目使用maven,其中pom.xml的相关内容如下:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<!--<scope>test</scope>-->
</dependency>
简单Producer的代码如下:
package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class TestProducer {
private static Properties kafkaProps;
private static void initKafka() {
kafkaProps = new Properties();
// broker url
kafkaProps.put("bootstrap.servers", "localhost:9092"); //,192.168.216.139:9092,192.168.216.140:9092
// request need to validate
kafkaProps.put("acks", "all");
// request failed to try
kafkaProps.put("retries", 0);
// memory cache size
kafkaProps.put("batch.size", 16384);
//
kafkaProps.put("linger.ms", 1);
kafkaProps.put("buffer.memory", 33554432);
// define the way of key and value serializer
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public static void main(String[] args) {
initKafka();
Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
System.out.println("Message sent successfully!");
producer.close();
}
}
代码中需要注意的是:bootstrap.servers的配置项,在默认kafka的单节点配置时,不能使用IP,而是使用localhost进行连接,否则会连接异常。
此处对代码中用到的几个参数进行解释:
简单Consumer的代码如下:
package kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class TestConsumer {
private static Properties kafkaProps = new Properties();
private static void kafkaInit() {
kafkaProps.put("bootstrap.servers", "localhost:9092");
// group id for each consumer
kafkaProps.put("group.id", "test");
// if value legal, auto add offset
kafkaProps.put("enable.auto.commit", "true");
// set how long time to udpate the offset value
kafkaProps.put("auto.commit.interval.ms", "1000");
// set session response time
kafkaProps.put("session.timeout.ms", "30000");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String[] args) {
kafkaInit();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(kafkaProps);
kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
System.out.println("Subscribed to topic:" + "my-topic");
int i = 0;
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100); // ?
for (ConsumerRecord<String, String> record : records) {
// print the offset, key and value for the consumer records
System.out.printf("Offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value());
}
}
}
}
Producer的成功运行后,部分输出如下:
可以看到,在producer的initKafka的相关配置项的值出现在ProducerConfig values中。
Consumer成功运行后,可以看到在producer中send的相关key和value值,在consumer的输出中出现:
在producer运行时,出现如下错误:
在提示的参考URL页面中,可以找到相关问题的说明:
具体的解决方法为,修改pom.xml文件:在pom.xml文件中加入slf4j的相关引用,并将slf4j-log4j12引用中:
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<!--<scope>test</scope>-->
</dependency>
修改完成后,重新运行producer程序,可以正常运行。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!