Apache Flink学习笔记(3)Flink的案例和提交作业 - Go语言中文社区

Apache Flink学习笔记(3)Flink的案例和提交作业


Flink提交作业流程

https://blog.csdn.net/weixin_43622131/article/details/112256784

案例

maven依赖和前期准备

maven工程中的resources需要放一个hello.txt文件。
需要用nc开启一个tcp服务

https://blog.csdn.net/liutao43/article/details/115495473
	<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.12.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>1.12.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>1.12.2</version>
    </dependency>

Batch

package org.benchmark;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.junit.Test;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.ExecutionEnvironment;

// 批处理word count
public class BatchWordCount {
    public static void main(String[] args)throws Exception {
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 从文件中读取数据
        String inputPath = Thread.currentThread().getContextClassLoader().getResource("hello.txt").getFile();
        DataSource<String> stringDataSource = env.readTextFile(inputPath);

        // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
        AggregateOperator<Tuple2<String, Integer>> sum = stringDataSource
                .flatMap(new MyFlatMapper())
                .groupBy(0).sum(1);

        sum.print();
    }

    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>>{
        private static final long serialVersionUID = -5224012503623543819L;

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception{
            // 分词
            String[] words = value.split(" ");
            for (String word : words){
                out.collect(new Tuple2<String,Integer>(word,1));
            }
        }
    }

    @Test
    public void test() {
        System.out.println(Thread.currentThread().getContextClassLoader().getResource("hello.txt").getFile());
    }
}

stream

算子设置的并行度 > env 设置的并行度 > 客户端提交优先级> 配置文件默认的并行度

package org.benchmark;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args)throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        // env.setParallelism(2);
        // 从文件中读取数据
        //  String inputPath = Thread.currentThread().getContextClassLoader().getResource("hello.txt").getFile();
        //  DataStreamSource<String> stringDataSource = env.readTextFile(inputPath);
        //  用parameter tool工具从程序启动参数提取数据
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host  = parameterTool.get("host");
        int port = parameterTool.getInt("port");


        // 使用nc提供一个server,从server中提取一个文本流
        DataStreamSource<String> stringDataSource = env.socketTextStream(host , port);
        // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stringDataSource
                .flatMap(new MyFlatMapper())
                .keyBy((Tuple2<String, Integer> value) -> {return value.f0;})
                .sum(1).setParallelism(2);
        sum.print().setParallelism(1);
        // 启动Job
        env.execute();
    }

    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>>{
        private static final long serialVersionUID = 7883096705374505894L;
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception{
            // 分词
            String[] words = value.split(" ");
            for (String word : words){
                out.collect(new Tuple2<String,Integer>(word,1));
            }
        }
    }
}

提交作业方式

1.命令行提交

flink run -c org.benchmark.StreamWordCount ./flink-benchmark-1.0-SNAPSHOT.jar --host ip --port 端口 >> /dev/null & 

flink cancel job_id		## 通过flink list查看

2. web界面提交

在这里插入图片描述

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/liutao43/article/details/115522760
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢