前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Beam-链路顺序

Beam-链路顺序

作者头像
Dlimeng
发布2023-06-29 16:48:53
1290
发布2023-06-29 16:48:53
举报
文章被收录于专栏:开源心路开源心路

简介

这个的介绍在我的另一篇博文中(Beam-介绍),在此不在再赘述,最近碰到个有意思的事,聊聊beam的链路,简单来说自己操作的一些函数中间有些转换组件,注册在链路中,在此截了一张官网的图片。

这是简单链路大概样子,各个函数串联在一起,当然了实际中不可能这样一帆风顺,肯定遇到很多种情况,我列下几种情况分享下。

集合注册

代码语言:javascript
复制
PipelineOptionsFactory.register(IndexerPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);

        

PCollection<String> p1 = pipeline.apply(TextIO.read().from("")).apply(ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                System.out.println(c.element().toString());
            }
        }));

PCollectionList<String> plist = PCollectionList.empty(pipeline);
plist.and(p1);
pipeline.run();

以导流的方式放到beam的集合,不断apply函数等等,形成多种链路,中间可以拆分导流集合,或者合并集合都很简单我就不说了,当然这些存储的都是计划,并没有数据,核心思想移动计算不移动数据。

错误案例1

代码语言:javascript
复制
 public static void main(String[] args) throws PropertyVetoException {

        IndexerPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(IndexerPipelineOptions.class);

        PipelineOptionsFactory.register(IndexerPipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);
        
        String s1="insert into test values('11','11')";
        String s2="insert into test values('12','12')";
        String s3="insert into test values('13','13')";
        String s5="insert into test values('15','15')";
        String s6="insert into test values('16','16')";
        String s7="insert into test values('17','17')";
        String s4="insert into test values('14','14')";


        save(pipeline,s1);
        save(pipeline,s2);
        save(pipeline,s3);
        save(pipeline,s4);
        save(pipeline,s5);
        save(pipeline,s6);
        save(pipeline,s7);


        pipeline.run();

    }

public static void save(Pipeline pipeline,String sql) throws PropertyVetoException {
        ComboPooledDataSource cpds = new ComboPooledDataSource();
        cpds.setDriverClass("com.mysql.jdbc.Driver");
        cpds.setJdbcUrl("jdbc:mysql://xxxx:3306/bigdata?characterEncoding=utf8&useSSL=true");
        cpds.setUser("root");
        cpds.setPassword("root");

       
        Schema type =
                Schema.builder().addStringField("sass").build();
        Row build = Row.withSchema(type).addValue("123").build();

        pipeline
                .apply(Create.of(build))
                .apply(
                        JdbcIO.<Row>write()
                                .withDataSourceConfiguration(
                                        JdbcIO.DataSourceConfiguration.create(
                                                cpds))
                                .withStatement(sql)
                                .withPreparedStatementSetter(
                                        (element, statement) -> {

                                        })
                );

    }

一个简单的多语句多输出的操作,输出多个PDone(Poutput),因为在同个pipeline中分发不同的输出,又因beam集合本身是无序,注册时没有依赖关系,分发任务不会排序,所以结果乱序。这种情形会很多,比如返回很多pipeline对象再注册继续会乱序的,比如PCollection注册链路再一起多个输出也会如此结果,比如PCollectionList注册顺序后输出结果也会乱序等等,经历过很多失败。

我使用JDBCIO连接hive一些大数据体系的库,这样用beam才会用到些精髓的东西,做这些测试案例用mysql因为方便些,原理相似。

错误案例2

代码语言:javascript
复制
 Schema type = Schema.builder().addStringField("test").build();
        Row row = Row.withSchema(type).addValue("test").build();
        PCollection<Row> r1 = pipeline.apply("r1",Create.of(row));
        PCollection<Row> r2 = pipeline.apply("r2",Create.of(row));
        PCollection<Row> r3 = pipeline.apply("r2",Create.of(row));
        PCollection<Row> r4= pipeline.apply("r4",Create.of(row));
        PCollection<Row> r5 = pipeline.apply("r5",Create.of(row));
        PCollection<Row> r6= pipeline.apply("r6",Create.of(row));
        PCollection<Row> r7 = pipeline.apply("r7",Create.of(row));

        PCollectionList<Row> pl = PCollectionList.of(r1).and(r2).and(r3).and(r4).and(r5).and(r6).and(r7);


        List<PCollection<Row>> all = pl.getAll();
        for (int i = 0; i < all.size(); i++) {
            save2(all.get(i),l.get(i));
        }

这样链路输出结果依旧会乱。

正确的操作

代码语言:javascript
复制
public static void main(String[] args) throws PropertyVetoException {

IndexerPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(IndexerPipelineOptions.class);

        PipelineOptionsFactory.register(IndexerPipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);
        Pipeline pipeline2 = Pipeline.create(options);
        Pipeline pipeline3 = Pipeline.create(options);
        Pipeline pipeline4 = Pipeline.create(options);

        String s1="insert into test values('11','11')";
        String s2="insert into test values('12','12')";
        String s3="insert into test values('13','13')";
        String s5="insert into test values('15','15')";
        String s6="insert into test values('16','16')";
        String s7="insert into test values('17','17')";
        String s4="insert into test values('14','14')";

        save(pipeline,s1).getPipeline().run();
        save(pipeline2,s2).getPipeline().run();
        save(pipeline3,s3).getPipeline().run();
        save(pipeline4,s4).getPipeline().run();

    }

其实这个用到核心思想,我在其他博文中讲到的大数据处理四大设计模式-分离处理模式(如果你在处理数据集时并不想丢弃里面的任何数据,而是想把数据分类为不同的类别进行处理时,你就需要用到分离式来处理数据。)的应用,一个pipeline解决不了,拆分多个管道处理,多次运行,分离开来,当然效率会有损害(朋友们可以思考下),我说了说一些想法,有错误踩过的坑,有正确的做法,都是积累,分享给朋友们,有更好想法交流交流。

Beam-介绍:https://blog.csdn.net/qq_19968255/article/details/96158013

本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-07-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 集合注册
    • 错误案例1
      • 错误案例2
        • 正确的操作
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
        http://www.vxiaotou.com