Kafka flume hdfs zookeeper Hive(一个案例详细整理) - Go语言中文社区

Kafka flume hdfs zookeeper Hive(一个案例详细整理)


   以下内容为一个从数据源的产生,到基于此数据的分析的详细步骤。同时还有每一个涉及到的技术的详细解析以及作用


     首先在这里简单介绍一下项目的流程:数据源的产生,然后将数据源发送到Kafka中,然后通过flume将Kafka中的数据下沉到hive中,hdfs则保存了数据。而zookeeper对这些分布式服务进行了协调。      

@产生数据源:

   第一步:在这里模拟了日志的产生。类似于某一个应用在安装,使用,以及卸载的过程中,向后台发送用户对该应用的所有信息。在这里不给出代码。大致是使用Java语言,利用数组,random类来随机的产生模拟日志的信息。然后把这些数据传递给Kafka。

@数据源下沉到Kafka

    读者可能在这里很疑惑,Kafka到底是什么,他在这里到底起了什么作用,为甚麽要经过Kafka。

   首先介绍Kafka是什么:官方语言,Kafka是一个开源流处理平台,由Scala语言和Java语言编写,Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理。也是为了通过集群来提供实时的消息。

   Kafka的特性:

 - 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。

- 可扩展性:kafka集群支持热扩展

- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

- 高并发:支持数千个客户端同时读写。(这个很重要)

    我的理解,基于这个项目来说,Kafka起到的作用,在这里Kafka被用来记录应该所产生的各种日志,然后把这些信息发布到Kafka的topic中,然后后台(订阅者)通过订阅这些topic来做实时(!!!!)的分析。

如果想要深入了解Kafka,我推荐一个网站:https://www.cnblogs.com/cxxjohnson/p/8921661.html,这里写的很好。 

  回到项目:下面的是数据下沉到Kafka的简单步骤。(详细代码在这里不给出,如果读者需要的话,私信我或者在底下评论)

//这里呢,是自己定义的一个方法,功能是将传递进来的数据下沉到Kafka中。
//在这里数据来源是json格式,建议读者也使用json格式数据。 
//该方法第一个参数:Kafka消息发送者,第二个参数,消息接收者,就是传递进来的数据都在topic里面。
//第三个参数就是,传递进来的数据源。
private void sendSingleLog(KafkaProducer<String, String> producer, String topic,AppBaseLog[] logs ) {
    for (AppBaseLog log:logs) {
       
        //将bean对象转换为json
        String logMsg = JSONObject.toJSONString(log);
        //创建待发送消息的对象,这里需要注意的是,往Kafka里面写的数据也是json格式。
        ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic,logMsg);      
        producer.send(data);
    }
    }

@使用flume把数据下沉到hive中

这里疑惑的是,数据不是在Kafka的topic(消息生产者)里面吗,为甚麽直接使用flume呢,(一会告诉你答案)

先告诉大家flume是什么:flume是一个高可用的,高可靠的,分布式的海量日志采集,聚合,传输的系统,flume支持在日志系统中定制各类数据发送方,用于收集数据。同时flume提供对数据进行简单处理,并写到各种数据接收方(可定制)。

如果大家想要深入了解flume是如何实现的这些功能。去自行查资料吧。

  在这个项目里,数据发送方就是Kafka,数据接收方就是hive。(这就是使用flume的答案)

  如果不使用flume可以想象,当数据实时的被传递到Kafka中时,要写多少代码,才能把数据下放到hive中。而flume只需要一个配置文件(flume代理),就可以把我们放到Kafka topic里面数据,直接放到hive里。不需要任何代码。

      这就是flume的一个配置文件。(正如上面的一样,有很多细节我这里不给出,比如这个文件如何创建,在哪创建。如何使用这个文件。如何把数据放到Kafka,等等很多细节。这里都没有给出。本文只提供这些技术在每一步实现的功能。)

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.app.flume.interceptor.LogCollInterceptor$Builder
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
a1.sources.r1.kafka.topics=topic_app_startup,topic_app_error,topic_app_event,topic_app_usage,topic_app_page

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/centos/applogs/%{logType}/%Y%m/%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second

#不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = DataStream

a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
//在这里告诉大家一个秘密,其实数据被放到了hdfs中。

  @hive

  hive是甚麽:hive是基于Hadoop的一个数据仓库工具(离线),可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。hive是一个建立在Hadoop文件系统上的数据仓库架构。可以用其对hdfs上数据进行分析和管理

为什么使用hive:hive可以自由的扩展集群的规模,一般情况下不需要重启服务。

                            hive支持用户自定义函数,用户可以根据自己的需求来实现自己的函数。

                            良好的容错性,节点出现问题SQL仍可完成工作。

   我的理解:hive实际上是把hdfs上的文件映射成了表结构。(按照文件格式创建table,然后hive的数据仓库会生成对应的目录,一般默认的仓库路径是:user/hive/warehouse/tablename(tablename是我们创建表的名字)),这时只要将符合table定义的文件加载到该目录就可以通过hql语句对整个目录的文件进行查询。

在本项目中,我们把日志数据经过一系列传输,最终发给到hive。而hive把数据结构化成了表,以表的形式显示数据,我们可以使用HQL对表进行查询。

 

在这里可能有一些疑惑,就是经过一系列的处理。我们没有对数据进行处理,只不过是把数据进行了结构化(数据存储在分布式文件系统HDFS中)。把数据变成了我们可以处理的数据。如果是T级别的数据。数据存储的压力是不是过大呢。不过这里的好处是,我们可以对这些数据进行分析了 ,而不是一些无用的数据。而大数据最终目的也是对海量数据进行挖掘。在海量数据中得到规律。

@HDFS

    hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件上的分布式文件系统。它和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式文件系统的区别也是很明显的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS放宽了一部分POSIX(我也不知道是啥)约束,来实现流式读取文件系统数据的目的。

在介绍一下(很重要):HDFS有着高容错性的特点,并且设计用来部署在低廉的硬件上。而且它提供高吞吐量来访问应用程序的数据。适合有着超大数据集的应用程序。可以实现流的形式访问文件系统中的数据。

而在这里HDFS被用作数据保存地点。而保存数据的路径就是hive表的路径。这样在我们通过flume下沉到hive实际上是,下沉到了hdfs中,而下沉到hdfs到的数据。我们可以直接在hive中使用。

@zookeeper

zookeeper是一个分布式的,开发源码的分布式应用程序的协调服务它是一个为分布式应用提供一致性服务的软件。提供的功能包括:配置服务,域名维护,分布式同步,组服务等。zookeeper的目标就是封装好复杂容易出错的关键服务,将简单易用的接口和性能高效,功能稳定的系统提供给用户。

写分布式程序的一个困难就是在于会出现"部分失败"。当一条消息在网络中的两个节点之间传送时,如果出现网络错误,那么这条消息有没有发送呢,这就是部分失败。部分失败是分布式系统固有的特征,因此,使用zookeeper并不能避免部分失败,当然他也不会隐藏部分失败,zookeeper可以提供一组工具,使你在构建分布式应用时能够对部分失败进行处理。

我的理解,zookeeper被设计成分布式程序协调服务,说白了,就是为了协调各个分布式程序(例如:Kafka)服务,让他们在一起好好工作,别处什么乱子。

   接下来呢,给大家一张图纸,理解一下这个过程。

 

 就到这里了。图纸中没有zookeeper。因为zookeeper其协调服务的功能。

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_43301349/article/details/102658933
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-06-06 09:36:50
  • 阅读 ( 1704 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢