kafka_2.12-1.1.0 生产与消费java实现示例 - Go语言中文社区

kafka_2.12-1.1.0 生产与消费java实现示例


kafka_2.12-1.1.0 生产与消费java实现示例

环境准备:

1)需要在maven工程中引入依赖:

复制代码

 1  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
 2     <dependency>
 3       <groupId>org.apache.kafka</groupId>
 4       <artifactId>kafka_2.12</artifactId>
 5       <version>1.1.0</version>
 6     </dependency>
 7     <dependency>
 8       <groupId> org.apache.cassandra</groupId>
 9       <artifactId>cassandra-all</artifactId>
10       <version>0.8.1</version>
11 
12       <exclusions>
13         <exclusion>
14           <groupId>org.slf4j</groupId>
15           <artifactId>slf4j-log4j12</artifactId>
16         </exclusion>
17         <exclusion>
18           <groupId>log4j</groupId>
19           <artifactId>log4j</artifactId>
20         </exclusion>
21       </exclusions>
22 
23     </dependency>

复制代码

2)本机是否能telnet 192.178.0.111 9092(kafaka所部署的vmw虚拟机)通? 如果telnet端口不通,则需要关闭192.178.0.111的防火墙:

systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动

一、生产者

首先看以下两种实现示例:

复制代码

package com.dx;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.Date;

public class ProducerTest {
    public static void main(String[] args) {
        producer_test1(args);

        producer_test2();
    }

    private static void producer_test2() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.178.0.111:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for(int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>("kafakatopic", Integer.toString(i), Integer.toString(i)));

        producer.close();
    }

    private static void producer_test1(String[] args) {
        String arg0 = args != null && args.length > 0 ? args[0] : "10";
        long events = Long.parseLong(arg0);
        Random rnd = new Random();

        //    /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --sync --topic kafkatopic
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.178.0.111:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 配置partitionner选择策略,可选配置
        props.put("partitioner.class", "com.dx.SimplePartitioner2");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (long nEvents = 0; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            String ip = "192.178.0." + rnd.nextInt(255);
            String msg = runtime + ",www.example.com," + ip;
            ProducerRecord<String, String> data = new ProducerRecord<String, String>("kafakatopic", ip, msg);
            Future<RecordMetadata> send = producer.send(data,
                    new Callback() {
                        public void onCompletion(RecordMetadata metadata, Exception e) {
                            if (e != null) {
                                e.printStackTrace();
                            } else {
                                System.out.println("The offset of the record we just sent is: " + metadata.offset());
                            }
                        }
                    });
        }
        producer.close();
    }
}

复制代码

SimplePartitioner2.java

按 Ctrl+C 复制代码

 

按 Ctrl+C 复制代码

参数设置备注:

1)bootstrap.servers --设置生产者需要连接的kafka地址
2)acks --回令类型
3)retries --重试次数
4)batch.size --批量提交大小
5)linger.ms --提交延迟等待时间(等待时间内可以追加提交)
6)buffer.memory --缓存大小
7)key.serializer|value.serializer --序列化方法

需要注意的有两点:
1、acks回令。如果必须等待回令,那么设置acks为all;否则,设置为-1;等待回令会有性能损耗。
2、生产者在发送消息的过程中,会自己默认批量提交。所以,如果单条指令的发送请求,记得发送完后flush才能生效。

3、SimplePartitioner2.java为kafaka分区,可选项。

二、消费者

以下实现示例:

复制代码

package com.dx;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;
import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import java.util.List;

/**
 * zk启动:sh /opt/zookeeper-3.4.11/bin/zkServer.sh start &
 * kafka启动:sh /opt/kafka_2.12-1.1.0/bin/kafka-server-start.sh /opt/kafka_2.12-1.1.0/config/server.properties &
 */
public class ConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.178.0.111:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("kafakatopic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

复制代码

三、测试

先启动productor运行,之后在启动consumer运行。在consumer打印结果如下:

复制代码

offset = 200, key = 192.178.0.20, value = 1522587459181,www.example.com,192.178.0.20
offset = 201, key = 192.178.0.143, value = 1522587459359,www.example.com,192.178.0.143
offset = 202, key = 192.178.0.113, value = 1522587459359,www.example.com,192.178.0.113
offset = 203, key = 192.178.0.110, value = 1522587459359,www.example.com,192.178.0.110
offset = 204, key = 192.178.0.232, value = 1522587459359,www.example.com,192.178.0.232
offset = 205, key = 192.178.0.96, value = 1522587459359,www.example.com,192.178.0.96
offset = 206, key = 192.178.0.76, value = 1522587459360,www.example.com,192.178.0.76
offset = 207, key = 192.178.0.78, value = 1522587459360,www.example.com,192.178.0.78
offset = 208, key = 192.178.0.80, value = 1522587459360,www.example.com,192.178.0.80
offset = 209, key = 192.178.0.177, value = 1522587459360,www.example.com,192.178.0.177
offset = 210, key = 0, value = 0
offset = 211, key = 1, value = 1
offset = 212, key = 2, value = 2
offset = 213, key = 3, value = 3
offset = 214, key = 4, value = 4
offset = 215, key = 5, value = 5
offset = 216, key = 6, value = 6
offset = 217, key = 7, value = 7
offset = 218, key = 8, value = 8
offset = 219, key = 9, value = 9

复制代码

 

 


感谢您的阅读,如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮。本文欢迎各位转载,但是转载文章之后必须在文章页面中给出作者和原文连接

基础才是编程人员应该深入研究的问题,比如:
1)List/Set/Map内部组成原理|区别
2)mysql索引存储结构&如何调优/b-tree特点、计算复杂度及影响复杂度的因素。。。
3)JVM运行组成与原理及调优
4)Java类加载器运行原理
5)Java中GC过程原理|使用的回收算法原理
6)Redis中hash一致性实现及与hash其他区别
7)Java多线程、线程池开发、管理Lock与Synchroined区别
8)Spring IOC/AOP 原理;加载过程的。。。
+加关注】。

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/e_wsq/article/details/81737919
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-02-02 18:14:44
  • 阅读 ( 1384 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢