前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据Kudu(十):Flink操作Kudu

大数据Kudu(十):Flink操作Kudu

原创
作者头像
Lansonli
发布2022-12-30 18:03:18
1.2K0
发布2022-12-30 18:03:18
举报
文章被收录于专栏:Lansonli技术博客

?Flink操作Kudu

Flink主要应用场景是流式数据处理上,有些公司针对流式数据使用Flink实时分析后将结果存入Kudu,例如快手公司。这里将实时计算的结果存入Kudu需要自定义Flink Kudu Sink。

场景:Flink实时读取Socket数据,将结果存入Kudu表t_flink_result,为了方便操作不再创建Kudu外表,这里在Impala中创建Kudu内表t_flink_result:

代码语言:javascript
复制
create table t_flink_result
(
	id  int,
	name string,
	age int,
	primary key (id)
)
partition by hash partitions 3
stored as kudu
tblproperties(
 'kudu.master_address' = 'cm1:7150,cm2:7150'
)

在Maven中导入以下Flink 包依赖:

代码语言:javascript
复制
<!-- Flink 开发Scala需要导入以下依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.9.1</version>
</dependency>

Flink 自定义KuduSink 代码如下:

代码语言:javascript
复制
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val ds: DataStream[String] = env.socketTextStream("cm3",9999)

//自定义KuduSink
ds.addSink(new RichSinkFunction[String] {
  //初始化连接Kudu对象
  var kuduClient :KuduClient = _

  //Kudu 表对象
  var kuduTable :KuduTable = _

  //创建KuduSession 客户端会话
  var session: KuduSession = _

  //初始化时调用一次,这里初始化连接Kudu的对象
  override def open(parameters: Configuration): Unit = {
    kuduClient = new KuduClientBuilder("cm1:7051,cm2:7051").build()
    kuduTable = kuduClient.openTable("impala::default.t_flink_result")
    session = kuduClient.newSession()
    //设置插入数据策略
    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
  }

  //来一条数据这里调用一次invoke方法
  override def invoke(one: String, context: SinkFunction.Context[_]): Unit = {
    val arr: Array[String] = one.split(",")
    val id: Int = arr(0).toInt
    val name: String = arr(1)
    val age: Int = arr(2).toInt

    //准备插入的数据
    val insert: Insert = kuduTable.newInsert()
    val row: PartialRow = insert.getRow
    row.addInt("id",id)
    row.addString("name",name)
    row.addInt("age",age)

    //插入到Kudu表中
    session.apply(insert)
  }

  //当Flink 关闭时调用一次,回收连接对象
  override def close(): Unit ={
    session.close()
    kuduClient.close()
  }
})

env.execute()

启动以上Flink 代码,打开Socket 服务器,输入数据,可以在impala 中查询表t_flink_result数据,数据被写入。

?

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ?Flink操作Kudu
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com