大数据Flink大屏实时计算深度剖析 - Go语言中文社区

大数据Flink大屏实时计算深度剖析



1. 实时计算应用场景

1.1 智能推荐

什么是智能推荐?
定义: 根据用户行为习惯所提供的数据, 系统提供策略模型,自动推荐符合用户行为的信息。
例举:
比如根据用户对商品的点击数据(时间周期,点击频次), 推荐类似的商品;
根据用户的评价与满意度, 推荐合适的品牌;
根据用户的使用习惯与点击行为,推荐类似的资讯。
应用案例:

在这里插入图片描述

1.2 实时数仓

什么是实时数仓
数据仓库(Data Warehouse),可简写为DW或DWH,是一个庞大的数据存储集合,通过对各种业务数
据进行筛选与整合,生成企业的分析性报告和各类报表,为企业的决策提供支持。实时仓库是基于
Storm/Spark(Streaming)/Flink等实时处理框架,构建的具备实时性特征的数据仓库。

应用案例
分析物流数据, 提升物流处理效率。
在这里插入图片描述
阿里巴巴菜鸟网络实时数仓设计:
在这里插入图片描述
数仓分层处理架构(流式ETL):
ODS -> DWD -> DWS -> ADS
ODS(Operation Data Store):操作数据层, 一般为原始采集数据。
DWD(Data Warehouse Detail) :明细数据层, 对数据经过清洗,也称为DWI。
DWS(Data Warehouse Service):汇总数据层,基于DWD层数据, 整合汇总成分析某一个主题域的服
务数据,一般是宽表, 由多个属性关联在一起的表, 比如用户行为日志信息:点赞、评论、收藏等。
ADS(Application Data Store): 应用数据层, 将结果同步至RDS数据库中, 一般做报表呈现使用。
在这里插入图片描述

1.3 大数据分析应用

  1. IoT数据分析
  1. 什么是IoT
    物联网是新一代信息技术,也是未来发展的趋势,英文全称为: Internet of things(IOT),顾名
    思义, 物联网就是万物相联。物联网通过智能感知、识别技术与普适计算等通信感知技术,广泛
    应用于网络的融合中,也因此被称为继计算机、互联网之后世界信息产业发展的第三次浪潮。
  2. 应用案例
    物联网设备运营分析:
    在这里插入图片描述
    华为Iot数据分析平台架构:
    在这里插入图片描述
  1. 智慧城市
    城市中汽车越来越多, 川流不息,高德地图等APP通过技术手段采集了越来越多的摄像头、车流
    的数据。
    但道路却越来越拥堵,越来越多的城市开始通过大数据技术, 对城市实行智能化管理。
    2018年, 杭州采用AI智慧城市,平均通行速度提高15%,监控摄像头日报警次数高达500次,识
    别准确率超过92%,AI智慧城市通报占全体95%以上,在中国城市交通堵塞排行榜, 杭州从中国
    第5名降至57名。
    在这里插入图片描述
    在这里插入图片描述
  2. 金融风控
    风险是金融机构业务固有特性,与金融机构相伴而生。金融机构盈利的来源就是承担风险的风险溢
    价。
    金融机构中常见的六种风险:市场风险、信用风险、流动性风险、操作风险、声誉风险及法律风
    险。其中最主要的是市场风险和信用风险。
    线上信贷流程,通过后台大数据系统进行反欺诈和信用评估:在这里插入图片描述
  3. 电商行业
    用户在电商的购物网站数据通过实时大数据分析之后, 通过大屏汇总展示, 比如天猫的双11购物
    活动,通过大屏, 将全国上亿买家的订单数据可视化,实时性的动态展示,包含总览数据,流式
    TopN数据,多维区域统计数据等,极大的增强了对海量数据的可读性。
    在这里插入图片描述
    TopN排行:
    在这里插入图片描述

2 Flink快速入门

大数据Flink概述
大数据Flink入门案例

3. Flink接入体系

3.1 Flink Connectors

Flink 连接器包含数据源输入与汇聚输出两部分。Flink自身内置了一些基础的连接器,数据源输入包含文件、目录、Socket以及 支持从collections 和 iterators 中读取数据;汇聚输出支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。
官方地址
Flink还可以支持扩展的连接器,能够与第三方系统进行交互。目前支持以下系统:

Flink还可以支持扩展的连接器,能够与第三方系统进行交互。目前支持以下系统:

常用的是Kafka、ES、HDFS以及JDBC。

3.2 JDBC(读/写)

Flink Connectors JDBC 如何使用?
功能: 将集合数据写入数据库中

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.11.2</version>
        </dependency>

代码:

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;


public class JDBCConnectorApplication {
    public static void main(String[] args)throws Exception {
        // 1. 创建运行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 创建集合数据
        List<String> list = Arrays.asList(
                "192.168.116.141\t1601297294548\tPOST\taddOrder",
                "192.168.116.142\t1601297294549\tGET\tgetOrder"
        );
// 3. 读取集合数据,写入数据库
        env.fromCollection(list).addSink(JdbcSink.sink(
// 配置SQL语句
                "insert into t_access_log(ip, time, type, api) values(?, ?, ?, ?)",
        new JdbcStatementBuilder<String>() {
            @Override
            public void accept(PreparedStatement preparedStatement,
                               String s) throws SQLException {
                System.out.println("receive ====> " + s);
// 解析数据
                String[] elements = String.valueOf(s).split("\t");
                for (int i = 0; i < elements.length; i++) {
// 新增数据
                    preparedStatement.setString(i+1, elements[i]);
                }
            }
        },
// JDBC 连接配置
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://192.168.116.141:3306/flink?useSSL=false")
                                        .withDriverName("com.mysql.jdbc.Driver")
                                        .withUsername("root")
                                        .withPassword("123456")
                                        .build()
                        ));
// 4. 执行任务
        env.execute("jdbc-job");
    }
}

数据表:

DROP TABLE IF EXISTS `t_access_log`; 
CREATE TABLE `t_access_log` ( 
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID', 
`ip` varchar(32) NOT NULL COMMENT 'IP地址', 
`time` varchar(255) NULL DEFAULT NULL COMMENT '访问时间', 
`type` varchar(32) NOT NULL COMMENT '请求类型', 
`api` varchar(32) NOT NULL COMMENT 'API地址', 
PRIMARY KEY (`id`) 
) ENGINE = InnoDB AUTO_INCREMENT=1; 

自定义写入数据源
功能:读取Socket数据, 采用流方式写入数据库中。
代码:

public class CustomSinkApplication {
    public static void main(String[] args)throws Exception {
        // 1. 创建运行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取Socket数据源
        DataStreamSource<String> socketTextStream =
                env.socketTextStream("192.168.116.141", 9911, "\n");
// 3. 转换处理流数据
        SingleOutputStreamOperator<AccessLog> outputStreamOperator =
                socketTextStream.map(new MapFunction<String, AccessLog>() {
                    @Override
                    public AccessLog map(String s) throws Exception {
                        System.out.println(s);
// 根据分隔符解析数据
                        String[] elements = s.split("\t");
// 将数据组装为对象
                        AccessLog accessLog = new AccessLog();
                        accessLog.setNum(1);
                        for (int i = 0; i < elements.length; i++) {
                            if (i == 0) accessLog.setIp(elements[i]);
                            if (i == 1) accessLog.setTime(elements[i]);
                            if (i == 2) accessLog.setType(elements[i]);
                            if (i == 3) accessLog.setApi(elements[i]);
                        }
                        return accessLog;
                    }
                });
// 4. 配置自定义写入数据源
        outputStreamOperator.addSink(new MySQLSinkFunction());
// 5. 执行任务
        env.execute("custom jdbc sink");
    }

自定义数据源

   private static class MySQLSinkFunction extends RichSinkFunction<AccessLog>{

        private Connection connection;

        private PreparedStatement preparedStatement;
        @Override
        public void open(Configuration parameters) throws Exception {
            String url="jdbc:mysql://192.168.11.14:3306/flik?useSSL=fales";
            String username="admin";
            String password="admin";

            connection= DriverManager.getConnection(url,username,password);
            String sql="insert  into xxx_log(ip,time,type,api) valuse(?,?,?,?)"
            preparedStatement=connection.prepareStatement(sql);
        }

        @Override
        public void close() throws Exception {
            try {
                if (null==connection)connection.close();
                connection=null;
            }catch (Exception e){
                e.printStackTrace();
            }
        }


        @Override
        public void invoke(AccessLog accessLog, Context context) throws Exception {
           preparedStatement.setString(1,accessLog.getIp());
           preparedStatement.setString(2,accessLog.getTime());
           preparedStatement.setString(3,accessLog.getType());
           preparedStatement.setString(4,accessLog.getApi());
           preparedStatement.execute();
        }
    }

AccessLog:

@Data
public class AccessLog {
    /**
     * IP地址 
     */
    private String ip;
    /**
     * 访问时间 
     */
    private String time;
    /**
     * 请求类型 
     */
    private String type;
    /**
     * API地址 
     */
    private String api;
    private Integer num;


}

测试数据:注意 \t

192.168.116.141 1603166893313 GET getOrder 
192.168.116.142 1603166893314 POST addOrder 

自定义读取数据源
功能: 读取数据库中的数据, 并将结果打印出来。
代码:

   public static void main(String[] args) {
        // 1. 创建运行环境 
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 配置自定义MySQL读取数据源 
        DataStreamSource<AccessLog> streamSource = env.addSource(new
                MySQLSourceFunction());
// 3. 设置并行度 
        streamSource.print().setParallelism(1);
// 4. 执行任务 
        env.execute("custom jdbc source");

    }

3.3 HDFS(读/写)

通过Sink写入HDFS数据
功能: 将Socket接收到的数据, 写入至HDFS文件中。

依赖

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.8.1</version>
        </dependency>

代码:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

public class HDFSSinkApplication {
    public static void main(String[] args) {
        // 1. 创建运行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取Socket数据源
// DataStreamSource<String> socketTextStream =
        env.socketTextStream("127.0.0.1", 9911, "\n");
        DataStreamSource<String> socketTextStream =
                env.socketTextStream("192.168.116.141", 9911, "\n");
// 3. 创建hdfs sink
        BucketingSink<String> bucketingSink = new BucketingSink<>("F:/oldlu/Flink/hdfs");
        bucketingSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm"));
                bucketingSink.setWriter(new StringWriter())
                        .setBatchSize(5 * 1024)// 设置每个文件的大小
                        .setBatchRolloverInterval(5 * 1000)// 设置滚动写入新文件的时间
                        .setInactiveBucketCheckInterval(30 * 1000)// 30秒检查一次不写入 的文件
                        .setInactiveBucketThreshold(60 * 1000);// 60秒不写入,就滚动写入新的文件
// 4. 写入至HDFS文件中
        socketTextStream.addSink(bucketingSink).setParallelism(1);
// 5. 执行任务
        env.execute("flink hdfs source");

    }
}

数据源模拟实现:

     <!-- Netty 核心组件依赖 -->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.16.Final</version>
            </dependency>
            <!-- spring boot 依赖 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>${spring.boot.version}</version>
            </dependency>
            <!-- Spring data jpa 组件依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-jpa</artifactId>
                <version>${spring.boot.version}</version>
            </dependency>
            <!-- mysql-connector-java -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.jdbc.version}</version>
            </dependency>
            <!-- Redis 缓存依赖 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
     
                        
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/ZGL_cyy/article/details/124864620
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2023-01-03 18:32:37
  • 阅读 ( 235 )
  • 分类:大数据

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢