当前位置:主页 > 查看内容

使用AirFlow调度MaxCompute

发布时间:2021-08-05 00:00| 位朋友查看

简介:背景 airflow是Airbnb开源的一个用python编写的调度工具 基于有向无环图(DAG) airflow可以定义一组有依赖的任务 按照依赖依次执行 通过python代码定义子任务 并支持各种Operate操作器 灵活性大 能满足用户的各种需求。本文主要介绍使用Airflow的python Opera……
背景

airflow是Airbnb开源的一个用python编写的调度工具 基于有向无环图(DAG) airflow可以定义一组有依赖的任务 按照依赖依次执行 通过python代码定义子任务 并支持各种Operate操作器 灵活性大 能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

一、环境准备Python 2.7.5??PyODPS支持Python2.6以上版本Airflow?apache-airflow-1.10.71.安装MaxCompute需要的包

pip install setuptools 3.0

pip install requests 2.4.0

pip install greenlet 0.4.10 ?# 可选 安装后能加速Tunnel上传。

pip install cython 0.19.0 ?# 可选 不建议Windows用户安装。

pip install pyodps

注意 如果requests包冲突 先卸载再安装对应的版本

2.执行如下命令检查安装是否成功


python -c from odps import ODPS


二、开发步骤

image

1.在Airflow家目录编写python调度脚本Airiflow_MC.py


# -*- coding: UTF-8 -*-

import sys

import os

from odps import ODPS

from odps import options

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from datetime import datetime, timedelta

from configparser import ConfigParser

import time

reload(sys)

sys.setdefaultencoding( utf8 )

#修改系统默认编码。

# MaxCompute参数设置

options.sql.settings { options.tunnel.limit_instance_tunnel : False, odps.sql.allow.fullscan : True}

cfg ConfigParser()

cfg.read( odps.ini )

print(cfg.items())

odps ODPS(cfg.get( odps , access_id ),cfg.get( odps , secret_access_key ),cfg.get( odps , project ),cfg.get( odps , endpoint ))

default_args {

? ? owner : airflow ,

? ? depends_on_past : False,

? ? retry_delay : timedelta(minutes 5),

? ? start_date :datetime(2020,1,15)

? ? # email : [ airflow example.com ],

? ? # email_on_failure : False,

? ? # email_on_retry : False,

? ? # retries : 1,

? ? # queue : bash_queue ,

? ? # pool : backfill ,

? ? # priority_weight : 10,

? ? # end_date : datetime(2016, 1, 1),

}

dag DAG(

? ? Airiflow_MC , default_args default_args, schedule_interval timedelta(seconds 30))

def read_sql(sqlfile):

? ? with io.open(sqlfile, encoding utf-8 , mode r ) as f:

? ? ? ? sql f.read()

? ? f.closed

? ? return sql

def get_time():

? ? print 当前时间是{} .format(time.time())

? ? return time.time()

def mc_job ():


? ? project odps.get_project() ?# 取到默认项目。

? ? instance odps.run_sql( select * from long_chinese; )

? ? print(instance.get_logview_address())

? ? instance.wait_for_success()

? ? with instance.open_reader() as reader:

? ? ? ? count reader.count

? ? print( 查询表数据条数 {} .format(count))

? ? for record in reader:

? ? ? ? print record

? ? return count

t1 PythonOperator (

? ? task_id get_time ,

? ? provide_context False ,

? ? python_callable get_time,

? ? dag dag )


t2 PythonOperator (

? ? task_id mc_job ,

? ? provide_context False ,

? ? python_callable mc_job ,

? ? dag dag )

t2.set_upstream(t1)


2.提交


python Airiflow_MC.py

3.进行测试


# print the list of active DAGs

airflow list_dags


# prints the list of tasks the tutorial dag_id

airflow list_tasks Airiflow_MC


# prints the hierarchy of tasks in the tutorial DAG

airflow list_tasks Airiflow_MC --tree

#测试task

airflow test Airiflow_MC get_time 2010-01-16

airflow test Airiflow_MC mc_job 2010-01-16

4.运行调度任务

登录到web界面点击按钮运行

03.png

5.查看任务运行结果

1.点击view log

04.png

2.查看结果

image



大家如果对MaxCompute有更多咨询或者建议 欢迎扫码加入 MaxCompute开发者社区钉钉群 或点击链接 申请加入。

54144fbf8bbf4f9ba7ceb09b5065f18b.png


本文转自网络,原文链接:https://developer.aliyun.com/article/786310
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文


随机推荐