kafka - Go语言中文社区

kafka


在Centos6或者7上安装Kafka最新版

一、官网

  http://kafka.apache.org/downloads.html

二、Kafka简介

  Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

  类似的组件还有:Azure的ServiceBus、RabbitMQ等,据网上描述,Kafka比RabbitMQ性能强。

三、安装

  1、安装Java

yum install java-1.8.0-openjdk.x86_64

  2、配置JAVA环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

  说明:配置时注意JAVA_HOME后面要加到/jre,这个比较特殊。另外,红色区域可以换成您对应的安装版本的路径。

  3、下载Kafka:http://kafka.apache.org/downloads.html

cd /opt
wget http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz

  4、解压并进入目录

tar -zvxf ./kafka_2.12-1.1.0.tgz
cd kafka_2.12-1.1.0

  5、启动Zookeeper

  使用安装包中的脚本启动单节点Zookeeper 实例:(参数-daemon表示后台运行)

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

  6、启动Kafka服务

bin/kafka-server-start.sh config/server.properties

  7、创建一个测试的Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

  查看Topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

 

  8、产生消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello xingzhu
>hello sindrol

 

  9、消费消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
xingzhu
sindrol

  好了,到此,单台Kafka已经安装完成。

四、集群配置

  1、单机多broker 集群配置

     利用单节点部署多个broker。 不同的broker 设置不同的 id,监听端口及日志目录。 例如:

cp config/server.properties config/server-1.properties 

    编辑配置:

config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

    启动Kafka服务:

bin/kafka-server-start.sh config/server-1.properties &

    启动多个服务后,可以参考第三节内容,产生和消费消息。

 

  2、多机多broker 集群配置

    分别在多个节点按上述方式安装Kafka,配置启动多个Zookeeper 实例。 例如: 在10.4.253.22,10.4.253.23,10.4.253.24三台机器部署,Zookeeper配置如下:

initLimit=5
syncLimit=2
server.1=10.4.253.22:2888:3888
server.2=10.4.253.23:2888:3888
server.3=10.4.253.24:2888:3888

    分别配置多个机器上的Kafka服务 设置不同的broke id,zookeeper.connect设置如下:

zookeeper.connect=10.4.253.22:2181,10.4.253.23:2181,10.4.253.24:2181

    启动Zookeeper与Kafka服务,按上文方式产生和消费消息,验证集群功能。

 

 

五、外网访问

  安装完成并启动后,如果想要外网通过外网IP访问,需要在config/service.properites中添加如下修改:

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:port

  修改完成后,重新启动Kafka服务。

 

六、C#调用

  引入库:kafka-net(https://github.com/Jroland/kafka-net)

  1、模拟消费端

复制代码

using KafkaNet;
using KafkaNet.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace KafkaClientDemo.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new KafkaOptions(new Uri("http://42.159.154.132:9092"));
            var router = new BrokerRouter(options);
            
            var consumer = new KafkaNet.Consumer(new ConsumerOptions("test", router));

            Console.WriteLine("waiting ...");
            //Consume returns a blocking IEnumerable (ie: never ending stream)
            foreach (var message in consumer.Consume())
            {
                Console.WriteLine("Response: P{0},O{1} : {2}", message.Meta.PartitionId, message.Meta.Offset, Encoding.UTF8.GetString(message.Value, 0, message.Value.Length));
            }
        }
    }
}

复制代码

  2、模拟消息生产端

复制代码

using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace KafkaClientDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new KafkaOptions(new Uri("http://42.159.154.132:9092"));
            var router = new BrokerRouter(options);
            var client = new Producer(router);
            //var topic = client.GetTopic("test");

            using (client)
            {
                while (true)
                {
                    Console.Write(">");
                    var text = Console.ReadLine();
                    client.SendMessageAsync("testTopic", new[] { new Message(text, "key_" + DateTime.Now.Ticks) }).Wait();
                  
                    if (text == "exit")
                        break;
                }
            }
        }
    }
}

复制代码

 

  运行效果:

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_42674948/article/details/83960034
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-08 17:09:29
  • 阅读 ( 1510 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢