前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink从Kafka到Kafka

Flink从Kafka到Kafka

原创
作者头像
用户3145449
修改2020-05-06 18:00:54
3.1K2
修改2020-05-06 18:00:54
举报
文章被收录于专栏:Flink落地Flink落地

为什么要写这篇文章?

Flink出来已经好几年了,现在release版本已经发布到1.10.0(截止2020-05-05),统一了批处理和流处理,很多大公司也都用到生实际务中,跑得也很high。这些大家都知道,但是当我开始考虑怎么在工作中落地flink的时候,我不知道怎么入手。公司比较小,目前没有实时计算,但是etl任务跑得比较慢,效率上有些跟不上。我的思路是想先试着用Flink来处理一些离线任务,看看能不能提升效率,同时为落地实时计算做准备。全网找了半天资料,文章倒是很多,包括一些付费资源,大部分的实例代码都跑不通,真的是跑不通。当然有部分原因是因为我对flink了解太少,但是完整的跑通除了word count之外的代码不应该是一件比较麻烦的事。

功能说明

1.生成json格式数据写入kafka topic1

2.消费topic1中的消息,写入topic2

目的很简单,如果要落地到具体业务免不了需要做多次的数据处理,Flink虽说是可以做批处理,但是支持得最好的还是流数据,确切的说是kafka的数据,跑通了这个流程,实际上Flink的落地就只差业务逻辑了,现在有Flink SQL,实现业务逻辑也是分分钟的事。

代码

其实只有4个文件

代码语言:javascript
复制
├── flink-learn-kafka-sink.iml
├── pom.xml
└── src
    ├── main
    │?? ├── java
    │?? │?? └── org
    │?? │??     └── apache
    │?? │??         └── flink
    │?? │??             └── learn
    │?? │??                 ├── Sink2Kafka.java
    │?? │??                 ├── model
    │?? │??                 │?? └── FamilyMemberTemperatureRecord.java
    │?? │??                 └── utils
    │?? │??                     ├── GsonUtil.java
    │?? │??                     └── KafkaGenDataUtil.java
    │?? └── resources
    └── test
        └── java

pom依赖

代码语言:javascript
复制
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <!--  json 处理 -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>

        <!--  kafka连接器 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--  kafka 客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.2</version>
        </dependency>
    </dependencies>

model

新冠肺炎影响身边每一个人,举了一个测体温记录测例子

代码语言:java
复制
package org.apache.flink.learn.model;

public class FamilyMemberTemperatureRecord {

    private int id;  // 测量次数
    private String name;    // 姓名
    private String temperature;    // 体温
    private String measureTime;    // 测量时间

    public FamilyMemberTemperatureRecord(int id, String name, String temperature, String measureTime) {
        this.id = id;
        this.name = name;
        this.temperature = temperature;
        this.measureTime = measureTime;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getTemperature() {
        return temperature;
    }

    public void setTemperature(String temperature) {
        this.temperature = temperature;
    }

    public String getMeasureTime() {
        return measureTime;
    }

    public void setMeasureTime(String measureTime) {
        this.measureTime = measureTime;
    }
}

json工具类

将对象解析为json格式的数据发给kafka

代码语言:java
复制
package org.apache.flink.learn.utils;

import com.google.gson.Gson;
import java.nio.charset.Charset;

/**
 * Desc: json工具类
 * Created by suddenly on 2020-05-05
 */
 
public class GsonUtil {
    private final static Gson gson = new Gson();

    public static <T> T fromJson(String value, Class<T> type) {
        return gson.fromJson(value, type);
    }

    public static String toJson(Object value) {
        return gson.toJson(value);
    }

    public static byte[] toJSONBytes(Object value) {
        return gson.toJson(value).getBytes(Charset.forName("UTF-8"));
    }
}

数据生成工具类

代码语言:java
复制
package org.apache.flink.learn.utils;

import org.apache.flink.learn.model.FamilyMemberTemperatureRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.commons.lang3.RandomUtils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

/**
 * Desc: 生成数据,写到kafka中
 * Created by suddenly on 2020-05-05
 */
 
public class KafkaGenDataUtil {
        private static final String broker_list = "localhost:9092";
        private static final String topic = "tempeature-source";    // 数据源topic 

        public static void genDataToKafka() throws InterruptedException {
            Properties props = new Properties();
            props.put("bootstrap.servers", broker_list);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<>(props);
            try {
                for (int i = 1; i <= 100; i++) {
                    Date currentTime = new Date();
                    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    String dateString = formatter.format(currentTime);  // 测量时间
                    Double body_tempeature = (int)(RandomUtils.nextDouble(36.0,38.5)*10)/10.0;  // 体温
                    FamilyMemberTemperatureRecord patient = new FamilyMemberTemperatureRecord(i, "suddenly",  String.valueOf(body_tempeature), dateString);
                    ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, GsonUtil.toJson(patient));
                    producer.send(record);
                    System.out.println("记录体温: " + GsonUtil.toJson(patient));
                    Thread.sleep(3 * 1000);
                }
            }catch (Exception e){
            }
            producer.flush();
        }
        public static void main(String[] args) throws InterruptedException {
            genDataToKafka();
        }
}

处理代码

代码语言:java
复制
package org.apache.flink.learn;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import java.util.Properties;

/**
 * Desc: 从kafka中读数据,写到另一个kafka topic中
 * Created by suddenly on 2020-05-05
 */
 
public class Sink2Kafka {
    private static final String SOURCE_TOPIC = "tempeature-source"; // 数据源topic,从这里读数据
    private static final String SINK_TOPIC = "tempeature-sink";     // 什么都不做,数据读出来之后直接写到这个目标topic
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "tempeature-measure-group");   // 这个随便起个名,没具体研究有什么用,我也是初学,先不用太在意这些细节
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        // 从source读数据
        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                SOURCE_TOPIC,
                new SimpleStringSchema(),
                props)).setParallelism(1);
        student.print();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "tempeature-measure-group");
        //  写到sink里
        student.addSink(new FlinkKafkaProducer011<>(
                "localhost:9092",
                SINK_TOPIC,
                new SimpleStringSchema()
        )).name("flink-connectors-kafka")
                .setParallelism(5);
        env.execute("flink learning connectors kafka");
    }
}

运行效果

生成数据
生成数据
消费数据
消费数据
查看kafka source和sink topic中的数据
查看kafka source和sink topic中的数据

到此,我们实现了生成数据写到kafka,再把kafka的数据消费后,发到另一个kafka中。

扩展

思考一下,上面的处理过程怎么用到离线业务中

1.把数据生成部分换成离线业务的数据源

2.把转发部分的逻辑改成数据清洗逻辑,离线任务就变成准实时任务了(比如原来按天调度的任务,可以先改成按小时读数据,数据延时就从24小时变成1小时了,进步还是不小的)

3.如果未来离线要改为实时,实时数据肯定也是走消息队列,假设就是kafka,那生成的源数据直接打到data source中就可以了,处理逻辑基本不需要作修改

怎么运行

1.kafka肯定是要安装的

2.上面的例子直接在idea中运行的,代码copy下就可以,如果报错的话,需要把flink-dist的包添加到idea的依赖里,如果你也是mac,/usr目录被隐藏了,添加目录的时候选择Macintosh HD,再按commond + shift + .就能显示隐藏目录了

idea添加flink基础依赖
idea添加flink基础依赖

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么要写这篇文章?
  • 功能说明
  • 代码
  • 运行效果
  • 扩展
  • 怎么运行
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com