38.大数据之旅——网站流量统计项目之实时业务系统(Kafka,storm,Hbase) - Go语言中文社区

38.大数据之旅——网站流量统计项目之实时业务系统(Kafka,storm,Hbase)


Hive的占位符与文件的调用


概述
对于上述的工作,我们发现需要手动去写hql语句从而完成离线数据的ETL,但每天都手动来做显然是不合适的,所以可以利用hive的文件调用与占位符来解决这个问题。

Hive文件的调用
实现步骤:
1)编写一个文件,后缀名为 .hive,
比如我们现在我们创建一个01.hive文件
目的是在 hive的weblog数据库下,创建一个tb1表

01.hive 文件编写示例:

use  weblog;
create table tb1 (id int,name string)

2)进入hive安装目录的bin目录
执行: sh hive -f 01.hive
注:-f 参数后跟的是01.hive文件的路径

3)测试hive的表是否创建成功

Hive占位符的使用
我们现在想通过hive执行文件,将 "tb1"这个表删除
则我们可以这样做

1)创建02.hive文件
编写示例:

use  weblog;
drop table ${tb_name}

2)在bin目录下,执行:

sh hive -f 02.hive -d tb_name="tb1"

结合业务的实现
在hive最后插入数据时,涉及到一个日志的分区是以每天为单位,所以我们需要手动去写这个日期,比如 2017-8-20。

现在,我们学习了Hive文件调用和占位符之后,我们可以这样做

1)将hql语句里的日期相关的取值用占位符来表示,并写在weblog.hive文件里
编写示例:

use weblog;
insert overwrite table tongji  select ${reportTime},tab1.pv,tab2.uv,tab3.vv,tab4.br,tab5.newip,tab6.newcust,tab7.avgtime,tab8.avgdeep from (select count(*) as pv from dataclear where reportTime = ${reportTime}) as tab1,(select count(distinct uvid) as uv from dataclear where reportTime = ${reportTime}) as tab2,(select count(distinct ssid) as vv from dataclear where reportTime = ${reportTime}) as tab3,(select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime=${reportTime} group by ssid having count(ssid) = 1) as br_tab) as br_taba,(select count(distinct ssid) as b from dataclear where reportTime=${reportTime}) as br_tabb) as tab4,(select count(distinct dataclear.cip) as newip from dataclear where dataclear.reportTime = ${reportTime} and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime < ${reportTime})) as tab5,(select count(distinct dataclear.uvid) as newcust from dataclear where dataclear.reportTime=${reportTime} and uvid not in (select dc2.uvid from dataclear as dc2 where dc2.reportTime < ${reportTime})) as tab6,(select round(avg(atTab.usetime),4) as avgtime from (select max(sstime) - min(sstime) as usetime from dataclear where reportTime=${reportTime} group by ssid) as atTab) as tab7,(select round(avg(deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime=${reportTime} group by ssid) as adTab) as tab8;
 

2.在hive 的bin目录下执行:

sh hive -f  weblog.hive -d reportTime="2017-8-20"

对于日期,如果不想手写的话,可以通过linux的指令来获取:

> date "+%G-%m-%d"

所以我们可以这样来执行hive文件的调用:
>sh hive -f 03.hive -d reportTime=date “+%G-%m-%d” (注:是键盘左上方的反引号)
也可以写为:

sh hive -f 03.hive -d reportTime=$(date "+%G-%m-%d")

Linux Crontab 定时任务
在工作中需要数据库在每天零点自动备份所以需要建立一个定时任务。
crontab命令的功能是在一定的时间间隔调度一些命令的执行。

可以通过 crontab -e 进行定时任务的编辑

crontab文件格式:

  •           *          *        *             *           command
    

minute hour day month week command
分 时 天 月 星期 命令
在这里插入图片描述

0 0 * * * ./home/software/hive/bin/hive -f /home/software/hive/bin/03.hive -d reportTime=`
date %G-%y-%d`

每隔1分钟,执行一次任务
编写示例:

*/1 * * * * rm -rf /home/software/1.txt

每隔一分钟,删除指定目录的 1.txt文件

实时业务系统搭建


实现步骤:
1.启动zk集群
2.启动kafka集群
指令:sh kafka-server-start.sh ../config/server.properties
3.配置flume的agent

rmr /brokers
rmr /admin
rmr /isr_change_notification
rmr /controller_epoch
rmr /rmstore
rmr /consumers
rmr /config

配置示例:

a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2
 
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=44444
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp
 
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://192.168.234.11:9000/weblog/reportTime=%Y-%m-%d
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=1000
 
a1.sinks.k2.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.brokerList=192.168.234.21:9092
a1.sinks.k2.topic=weblog
 
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
 
a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2
 

启动kafka:sh kafka-server-start.sh ../config/server.properties
4.创建kafka的topic
执行:

sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 2 --topic weblog

5.创建kafak的consumer,测试是否能够收到消息
执行: sh kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic weblog
6.执行测试:
访问页面——>flume——>kafka

Kafka-Storm系统搭建


实现步骤:

  1. 创建java工程
  2. 导入storm依赖jar包、kafka依赖包、storm-kafka依赖包及相关依赖
    在这里插入图片描述
    3.移除重复的jar包
    在这里插入图片描述
    注意:删除1.75,留1.72的
  3. 开发代码

Storm业务处理说明
数据清洗:
去除多余的字段 只保留 有用的字段 并且对于ss字段做拆分
url、urlname、uvid、ssid、sscount、sstime、cip

WebLogTopology代码:

public class WebLogTopology {
public static void main(String[] args) throws Exception {
 
//1.指定zk集群地址
BrokerHosts hosts = new ZkHosts("192.168.234.21:2181,192.168.234.22:2181,192.168.234.23:2181");
 
//--设定hosts,topic,zkroot(zkroot需要提前在zk下创建完毕,
//本例中,应在zk下注册 /weblog/info 节点
SpoutConfig conf = new SpoutConfig(hosts, "weblog", "/weblog", "info");
conf.scheme = new SchemeAsMultiScheme(new StringScheme());
 
 
//--创建KafkaSpout,用于接收Kafka的消息源
KafkaSpout spout = new KafkaSpout(conf);
//--创建Bolt
PrintBolt printBolt = new PrintBolt();
 
//2.创建构建者
TopologyBuilder builder  = new TopologyBuilder();
 
//3.组织拓扑
builder.setSpout("Weblog_spout", spout);
builder.setBolt("Print_Bolt", printBolt).shuffleGrouping("Weblog_spout");
 
//4.创建拓扑
StormTopology topology = builder.createTopology();
 
//5.提交到集群中运行 - 本地测试
LocalCluster cluster = new LocalCluster();
Config config = new Config();
cluster.submitTopology("Weblog_Topology", config, topology);
 
Thread.sleep(100 * 1000);
cluster.killTopology("Weblog_Topology");
cluster.shutdown();
}
}

PrintBolt代码:

public class PrintBolt extends BaseRichBolt {
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
Fields fields = input.getFields();
StringBuffer buf = new StringBuffer();
 
Iterator<String> it = fields.iterator();
while(it.hasNext()){
String key = it.next();
Object value = input.getValueByField(key);
buf.append(key+":"+value);
}
System.out.println(buf.toString());
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
 
}
 

}

测试:当访问页面时,访问一次,Strom会接收一条信息并打印如下:
在这里插入图片描述

Storm业务处理—上


数据清洗
当Storm收到一条数据后:

str:http://localhost:8090/Demo/b.jsp|b.jsp|页面B|UTF-8|1024x768|24-bit|zh-cn|0|1|27.0 r0|0.5690286228250014|http://localhost:8090/Demo/a.jsp|Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2783.4 Safari/537.36|78024453153757966560|9974858526_1_1511218867768|0:0:0:0:0:0:0:1

接下来需要做数据清洗,去除多余的字段,保留有用字段。
url、urlname、uvid、ssid、sscount、sstime、cip

流式的业务处理
pv—点击量,一条日志就是一次pv,可以实时处理
vv—独立会话,根据ssid实现流式处理
uv—独立访客数,根据uvid可以实时处理

此外,
br—跳出率,一天内跳出的会话总数/会话总数,不适合实时处理
avgtime—一天内所有会话的访问时常的平均值,不适合实时处理
avgdeep—平均访问深度,不适合实时处理

综上,我们可以看到并不是所有的业务都适合实时处理的。

业务代码实现
ClearBolt代码:

public class ClearBolt extends BaseRichBolt{
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
String value = input.getStringByField("str");
String [] attrs = value.split("\|");
String url = attrs[0];
String urlname = attrs[1]; 
String uvid = attrs[13];
String ssid = attrs[14].split("_")[0];
String sscount = attrs[14].split("_")[1]; 
String sstime = attrs[14].split("_")[2];
String cip = attrs[15];
collector.emit(input,new Values(url,urlname,uvid,ssid,sscount,sstime,cip));
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url","urlname","uvid","ssid","sscount","sstime","cip"));
}
 
}

PvBlot代码:

public class PvBolt extends BaseRichBolt {
 
private static int pv=0;
 
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
pv++;
List<Object> values=input.getValues();
values.add(pv);
collector.emit(values);
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url","urlname","uvid","ssid","sscount","sstime","cip","pv"));
}
 
}

VvBolt代码:

public class VvBolt extends BaseRichBolt {
private OutputCollector collector = null;
 
private static int vv=1;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
String sscount = input.getStringByField("sscount");
if(sscount.equals("0")){
vv++;
}
List<Object> values = input.getValues();
values.add(vv);
collector.emit(input,values);
collector.ack(input);
} catch (Exception e) {
e.printStackTrace();
collector.fail(input);
}
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url","urlname","uvid","ssid","sscount","sstime","cip","pv","vv"));
}
 
}
 

UvBolt代码:

public class UvBolt extends BaseRichBolt{
 
 
private Map<String, Integer> uvMap=new HashMap<>();        
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
String uvid=input.getStringByField("uvid");
List<Object> values=input.getValues();
 
if(uvMap.containsKey(uvid)){
values.add(uvMap.size());
collector.emit(values);
collector.ack(input);
}else{
uvMap.put(uvid,1);
values.add(uvMap.size());
collector.emit(values);
collector.ack(input);
}
 
 
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url","urlname","uvid","ssid","sscount","sstime","cip","pv","vv","uv"));
}
 
}
 

WebLogTopology代码:

public class WebLogTopology {
public static void main(String[] args) throws Exception {
 
BrokerHosts hosts = new ZkHosts("192.168.234.21:2181,192.168.234.22:2181,192.168.234.23:2181");
 
SpoutConfig conf = new SpoutConfig(hosts, "weblog", "/weblog", UUID.randomUUID().toString());
conf.scheme = new SchemeAsMultiScheme(new StringScheme());
 
KafkaSpout spout = new KafkaSpout(conf);
 
PrintBolt printBolt = 
                            
                            版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_39188039/article/details/86588942
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢