kafka进击之路(五) ——producer API开发 - Go语言中文社区

kafka进击之路(五) ——producer API开发


说明

kafka版本:kafka_2.10-0.8.2.1(kafka0.9.xx版本提供了新的API)
IED环境:intellij14 + maven3.3
开发语言:java

kafka producer API开发

kafka producer的开发使用起来要不consumer简单的多,不过我们要知道producer有两种方式,一种是同步,一种是异步。两种producer的发送结构如下:
1. 同步producer
这里写图片描述
2. 异步producer
这里写图片描述

下面我们就写一个简单的异步kafka producer示例,在这里,我们额外对producer做一下封装,封装成一个单例模式的服务来供向kafka发送数据。

maven依赖

      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
      </dependency>

java包组成

这里写图片描述

配置文件producer.properties

bootstrap.servers=xxxx:9092,xxxx:9092,xxxx:9092
batch.size=1
retries=0
acks=1
block.on.buffer.full=true
timeout.ms=30000
reconnect.backoff.ms=10
retry.backoff.ms = 100
max.in.flight.requests.per.connection = 5

buffer.memory = 33554432
send.buffer.bytes=131072
receive.buffer.bytes=32768
max.request.size=1048576

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

KafkaProducerAdapter.java代码

package kafka.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.FileInputStream;
import java.util.Properties;


public class KafkaProducerAdapter {
    private static final Logger logger = LogManager.getLogger(KafkaProducerAdapter.class);

    private static KafkaProducer kafkaProducer = null;

    private static KafkaProducerAdapter kafkaProducerAdapter = null;

    private KafkaProducerAdapter() { }

    /**
     * 单例
     * @return
     */
    public static KafkaProducerAdapter getInstance() {
        if (kafkaProducerAdapter == null) {
            synchronized (KafkaProducerAdapter.class) {
                if (kafkaProducerAdapter == null) {
                    kafkaProducerAdapter = new KafkaProducerAdapter();
                }
            }
        }
        return kafkaProducerAdapter;
    }

    public void init(String configFile) throws Exception {
        try {
            Properties properties = new Properties();
            properties.load(new FileInputStream(configFile));
            kafkaProducer = new KafkaProducer(properties);
        } catch (Exception e) {
            throw new Exception("init kafka producer exception:" + e.getMessage());
        }
    }

    /**
     * 发送一条消息
     * @param message
     */
    public void send(String topic, String message) {
        ProducerRecord record;
        record = new ProducerRecord<>(topic, "", message);
        kafkaProducer.send(record, new SendCallback(record, 0));
    }

    /**
     * producer回调
     */
    static class SendCallback implements Callback {
        ProducerRecord<String, String> record;
        int sendSeq = 0;

        public SendCallback(ProducerRecord record, int sendSeq) {
            this.record = record;
            this.sendSeq = sendSeq;
        }
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            //send success
            if (null == e) {
                String meta = "topic:" + recordMetadata.topic() + ", partition:"
                        + recordMetadata.topic() + ", offset:" + recordMetadata.offset();
                logger.info("send message success, record:" + record.toString() + ", meta:" + meta);
                return;
            }
            //send failed
            logger.error("send message failed, seq:" + sendSeq + ", record:" + record.toString() + ", errmsg:" + e.getMessage());
            if (sendSeq < 1) {
                kafkaProducer.send(record, new SendCallback(record, ++sendSeq));
            }
        }
    }
}

Main.java代码

package kafka.producer;

public class Main {
    public static void main(String[] args) {
        try {
            KafkaProducerAdapter producer = KafkaProducerAdapter.getInstance();
            producer.init("./producer.properties");

            // send some message
            producer.send("test");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

更多可参考:
1. https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_35799003/article/details/52238744
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2021-06-20 17:21:19
  • 阅读 ( 1299 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢