前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)

2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)

作者头像
Maynor
发布2021-12-07 10:30:38
1.8K0
发布2021-12-07 10:30:38
举报

首先准备模拟数据:

代码语言:javascript
复制
//1、准备配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("KafkaCustomPartitioner.class", "test.KafkaCustomPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Kafka的一系列配置,可以从官网直接copy过来@~@~ 然后正式生产模拟数据:

代码语言:javascript
复制
//2、创建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"};
        Random random = new Random();
        while (true){
            //随机生成分类和金额
            int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
            String category = categorys[index];//获取的随机分类
            double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机数,*100之后表示[0~100)

            CategoryPojo categoryPojo = new CategoryPojo(category, price,System.currentTimeMillis());
            String data = JSON.toJSONString(categoryPojo);

            //3、发送数据
            kafkaProducer.send(new ProducerRecord<String, String>("topicDemo",data));
            System.out.println("数据是"+data);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

这里的实体类用Lombok,比较简单: 这是之前写的Lombok用法文章

代码语言:javascript
复制
 @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double price;//该分类总销售额
        private long time;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
    }

有了数据写入Kafka,我们开始消费“她”:

设置一下Flink运行环境:

代码语言:javascript
复制
 //TODO 1.设置环境env
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //并行度为1,表示不分区
        env.setParallelism(1);

配置Kafka相关并从哪里开始读offset

代码语言:javascript
复制
//TODO 2设置Kafka相关参数
        Properties props = new Properties();
        //kafka的地址,消费组名
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.88.161:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"category");

        //Flink设置kafka的offset,从最新的开始
         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "myDemo",
                new SimpleStringSchema(),
                props
        );
        consumer.setStartFromLatest();
        consumer.setCommitOffsetsOnCheckpoints(true);

第3步解析数据源并测试:

代码语言:javascript
复制
DataStreamSource<String> source = env.addSource(consumer);
         SingleOutputStreamOperator<Order> mapDS = source.map(new MapFunction<String, Order>() {
            @Override
            public Order map(String s) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                Order order = JSON.toJavaObject(jsonObject, Order.class);
                return order;
            }
        });
        //测试一下
        mapDS.print();
在这里插入图片描述
在这里插入图片描述

success! 最后存入Mysql

代码语言:javascript
复制
//sink输出到Mysql
        result.addSink(JdbcSink.sink(
                "INSERT INTO t_order(category,price,time) values(?,?,?)",
                (ps,order)->{
                    ps.setString(1,order.category);
                    ps.setDouble(2,order.price);
                    ps.setLong(3,order.time);
                },
                //批处理
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://192.168.88.163:3306/bigdata?characterEncoding=utf-8") //jdbc
                .withUsername("root")   //配置用户名
                .withPassword("123456") //密码
                .withDriverName("com.mysql.jdbc.Driver") //驱动类
                .build()
        ));

        env.execute();

以上就是全部内容了,感谢您的阅读! 另外补充一些不成熟的代码:双流Join

代码语言:javascript
复制
//双流Join
        SingleOutputStreamOperator<Order> order1watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem1WaterMark());
        SingleOutputStreamOperator<Order> order2watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem2WaterMark());

        //商品ID=订单ID
        final DataStream<Order> result = order1watermark.join(order2watermark)
                .where(o1 -> o1.category)
                .equalTo(o2 -> o2.category)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply((o1, o2) -> {
                    Order order = new Order();
                    order.setCategory(o1.category);
                    order.setPrice(o2.price);
                    order.setTime(o2.time);
                    return order;
                });
//        result.print();

水印机制,简化了直接使用系统时间

代码语言:javascript
复制
//水印机制
    public static class OrderItem2WaterMark implements WatermarkStrategy<Order>{

        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order order, long l, WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }

        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element,recordTimestamp)->System.currentTimeMillis();
        }
    }
    public static class OrderItem1WaterMark implements WatermarkStrategy<Order> {
        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
            }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
本文参与?腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-07-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体同步曝光计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com