社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
pom文件中,添加客户端jar包
<dependency>
<groupId>org.sourcelab</groupId>
<artifactId>kafka-connect-client</artifactId>
<version>2.1.0</version>
</dependency>
示例代码:
配置信息
final Configuration configuration = new Configuration(“http://hostname.for.kafka-connect.service.com:8083”);
新建KafkaConnectClient实例
final KafkaConnectClient client = new KafkaConnectClient(configuration);
查看连接器的列表
final Collection connectorList = client.getConnectors();
创建一个VerifiableSource的连接器
final ConnectorDefinition connectorDefition = client.addConnector(NewConnectorDefinition.newBuilder()
.withName(“MyNewConnector”)
.withConfig(“connector.class”, “org.apache.kafka.connect.tools.VerifiableSourceConnector”)
.withConfig(“tasks.max”, 3)
.withConfig(“topics”, “test-topic”)
.build()
));
//创建elasticSearchSink的示例代码
final ConnectorDefinition connectorDefition = client.addConnector(NewConnectorDefinition.newBuilder()
.withName(“elasticSearchConnector”)
.withConfig(“connector.class”, “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”)
.withConfig(“tasks.max”, 1)
.withConfig(“type.name”, “mysql-data”)
.withConfig(“connection.url”,“http://10.10.77.142:9200”)
.withConfig(“topics”,“ziqi.login”)
.withConfig(“key.ignore”,“true”)
.build()
);
//创建一个mysqlSource的连接器
final ConnectorDefinition connectorDefition = client.addConnector(NewConnectorDefinition.newBuilder()
.withName("mysqlConnector")
.withConfig("connector.class", "io.confluent.connect.jdbc.JdbcSourceConnector")
.withConfig("tasks.max", 1)
.withConfig("connection.url","jdbc:mysql://10.10.77.138:3306/kafka-test?user=root&password=supconit")
.withConfig("mode","incrementing").withConfig("incrementing.column.id","id")
.withConfig("topic.prefix","ziqi.")
.build()
);
数据库中的数据
通过上面的mysqlSource创建完成,我们查看topic为ziqi.login的数据发现,已经将数据库中的数据导入到kafka主题中
然后我们查看es的数据,已经成功的将mysql库中的数据导入到elasticSearch中
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!