Flink零基础学习(三) Data Sink讲解和实例 - Go语言中文社区

Flink零基础学习(三) Data Sink讲解和实例


1.概念:

 主要是对经过flink处理后的流所做一系列的操作,操作完后就把计算后的数据结果 Sink 到某个地方( Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等),简单的讲就是流去向

2.主要参与类(SinkFunction

可以看到包含上面所说的各种处理类对象

主要是invoke()方法

最简单的处理当然是打印到控制台了

如基础学习二中,我们可以直接将得到流处理,只需调用addSink方法,如下:

3.mysql插入实例(kafka数据源,mysql去向)

  1.创建student实体类

  

/**
 * @author zhouxl
 * 学生类
 */
@Data
public class Student {
    public int id;
    public String name;
    public String password;
    public int age;

    public Student() {
    }

    public Student(int id, String name, String password, int age) {
        this.id = id;
        this.name = name;
        this.password = password;
        this.age = age;
    }

    @Override
    public String toString() {
        return "{" +
                "id:" + id +
                ", name:'" + name + ''' +
                ", password:'" + password + ''' +
                ", age:" + age +
                '}';
    }
    
}

 2.创建kafka生产者

import com.alibaba.fastjson.JSON;
import com.tuya.mysqlDemo.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;

public class SinkFromKafka {

    public static  final String broker_list ="kafka地址";

    public static  final String topic ="flink_demo";

    public static  void writeTokafak(Integer count) throws InterruptedException{

        //生产者配置文件,具体配置可参考ProducerConfig类源码,或者参考官网介绍
        Map<String,Object> config=new HashMap<String, Object>(16);
        //kafka服务器地址
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka地址");
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG," org.apache.kafka.clients.producer.internals.DefaultPartitioner");

        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5);


        KafkaProducer producer =new KafkaProducer<String,String>(config);

            Student student =new Student(count,"zhouxl"+count,"password",count);
            ProducerRecord record =new ProducerRecord<String,String>(topic,null,null,JSON.toJSONString(student));
            Future<RecordMetadata> future= producer.send(record);
            producer.flush();
            try{
                future.get();
                System.out.println("发送"+future.isDone()+"数据:"+ JSON.toJSONString(student));
            }catch (Exception e){

            }

    }


    /**
     * 模拟kafka发送数据
     * @param args
     */
    public static void main(String[] args)throws InterruptedException{
        Integer count=1;
        while(true){
            Thread.sleep(5000);
            writeTokafak(count++);
        }

    }
}

3.flink流处理

import com.alibaba.fastjson.JSON;
import com.tuya.mysqlDemo.Student;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

/**
 * @author zhouxl
 * 接受Kafka消息
 */
public class Main {

    public static  void main(String[] args) throws Exception{

        final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props =new Properties();

        props.put("bootstrap.servers","kafka地址");


        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset","latest");

        SingleOutputStreamOperator<Student> student =env.addSource(
                new FlinkKafkaConsumer<>("flink_demo",
                        //String序列化
                        new SimpleStringSchema(),
                        props
                )
        ).setParallelism(1).map(string-> JSON.parseObject(string,Student.class));

        /**
         * 种类1
         * 将从kafka读到的数据打印在控制台(2个效果一样)
         */
//        student.print();
//        student.addSink(new PrintSinkFunction<>());

        /**
         * 种类2
         * 将kafka数据写到数据库
         */
        student.addSink(new SinkToMysql());
}

4.sink去向处理

import com.tuya.mysqlDemo.Student;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.flink.configuration.Configuration;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @author zhouxl
 * flink流的去向的处理
 */
public class SinkToMysql extends RichSinkFunction<Student> {

    PreparedStatement ps;

    private Connection connection;

    @Override
    public void open(Configuration  parameters) throws Exception{
        super.open(parameters);
        connection =getConnection();
        String sql ="insert into Student(id,name,password,age) values(?,?,?,?); ";
        ps=this.connection.prepareStatement(sql);
    }


    /**
     * 程序执行完毕,关闭链接和释放资源的动作了
     * @throws Exception
     */
    @Override
    public void close() throws Exception{
        super.close();
        if(connection !=null){
            connection.close();
        }
        if(ps!=null){
            ps.close();
        }
    }


    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Student value, Context context) throws Exception {
        //组装数据,执行插入操作
        ps.setInt(1, value.getId());
        ps.setString(2, value.getName());
        ps.setString(3, value.getPassword());
        ps.setInt(4, value.getAge());
        ps.executeUpdate();
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://172.16.248.10:3306/flink_demo?characterEncoding=utf-8&useUnicode=true&autoReconnect=true&allowMultiQueries=true", "root", "root");
        } catch (Exception e) {
            System.out.println("mysql连接异常 "+ e.getMessage());
        }
        return con;
    }

4.启动SinkFromKafka

启动main,观察数据库

ok!

 


 

 

   

 

 

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_40650378/article/details/103834520
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-06 23:18:15
  • 阅读 ( 1456 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢