前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MRS Flink使用SQL-Client对接Hive

MRS Flink使用SQL-Client对接Hive

原创
作者头像
玖柒的小窝
修改2021-09-30 10:21:18
1.1K0
修改2021-09-30 10:21:18
举报
文章被收录于专栏:各类技术文章~各类技术文章~

一、准备环境

1.根据产品文档安装Flink客户端;

2.将sql-client-defaults.yaml放入/opt/client/Flink/flink/conf中

3.将jaas.conf放入/opt/client/Flink/flink/conf中

Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=false useTicketCache=true debug=false; };

4.添加sql-client.sh中添加在JVM_ARGS参数:

JVM_ARGS="-Djava.security.auth.login.config=/opt/client/Flink/flink/conf/jaas.conf $JVM_ARGS"

二、启动Flink集群

例如:yarn-session.sh -t ssl -d

三、启动SQL-Client

./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml

四、运行SQL

CREATE TABLE kafkaSourceTable ( ? order_id VARCHAR, ? shop_id VARCHAR, ? member_id VARCHAR, ? trade_amt DOUBLE ) WITH ( ? 'connector.type' = 'kafka', ? 'connector.version' = 'universal', ? 'connector.topic' = 'order_sql', ? 'connector.properties.bootstrap.servers' = '10.162.147.217:21005', ? 'connector.properties.zookeeper.connect' = '10.162.147.217:24002', ? 'connector.properties.group.id' = 'test-consumer-group', ? 'connector.startup-mode' = 'latest-offset', ? 'format.type' = 'json' );

CREATE TABLE kafkaSinkTable(shop_id VARCHAR, member_id VARCHAR) WITH ( ? 'connector.type' = 'kafka', ? 'connector.version' = 'universal', ? 'connector.topic' = 'order_sql', ? 'connector.properties.bootstrap.servers' = '10.162.147.217:21005', ? 'connector.properties.zookeeper.connect' = '10.162.147.217:24002', ? 'update-mode' = 'append', ? 'format.type' = 'json' ); INSERT INTO ? kafkaSinkTable SELECT ? shop_id, ? member_id FROM ? kafkaSourceTable; SELECT ? shop_id, ? member_id FROM ? kafkaSourceTable;

五、对接Hive

1)修改sql-client-defaults.yaml

catalogs: ????- name: myhive ?????type: hive ????hive-conf-dir: /opt/clienrc5/Hive/config ????hive-version: 3.1.0

2)在/opt/clienrc5/Hive/config/hive-site.xml添加配置

<property> <name>hive.metastore.sasl.enabled</name> <value>true</value> </property>

3)启动sql-client

use catalog myhive;

SET table.sql-dialect=hive;

CREATE TABLE IF NOT EXISTS hive_dialect_tbl ( `id` int , `name` string , `age` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; SET table.sql-dialect=default;

CREATE TABLE datagen ( `id` int , `name` string , `age` int ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1' ); INSERT INTO hive_dialect_tbl SELECT * FROM datagen; select * from hive_dialect_tbl;

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 二、启动Flink集群
  • 三、启动SQL-Client
  • 四、运行SQL
  • 五、对接Hive
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com