airflow是Airbnb开源的一个用python编写的调度工具 基于有向无环图(DAG) airflow可以定义一组有依赖的任务 按照依赖依次执行 通过python代码定义子任务 并支持各种Operate操作器 灵活性大 能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务
一、环境准备Python 2.7.5??PyODPS支持Python2.6以上版本Airflow?apache-airflow-1.10.71.安装MaxCompute需要的包pip install setuptools 3.0
pip install requests 2.4.0
pip install greenlet 0.4.10 ?# 可选 安装后能加速Tunnel上传。
pip install cython 0.19.0 ?# 可选 不建议Windows用户安装。
pip install pyodps
注意 如果requests包冲突 先卸载再安装对应的版本
2.执行如下命令检查安装是否成功python -c from odps import ODPS
# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding( utf8 )
#修改系统默认编码。
# MaxCompute参数设置
options.sql.settings { options.tunnel.limit_instance_tunnel : False, odps.sql.allow.fullscan : True}
cfg ConfigParser()
cfg.read( odps.ini )
print(cfg.items())
odps ODPS(cfg.get( odps , access_id ),cfg.get( odps , secret_access_key ),cfg.get( odps , project ),cfg.get( odps , endpoint ))
default_args {
? ? owner : airflow ,
? ? depends_on_past : False,
? ? retry_delay : timedelta(minutes 5),
? ? start_date :datetime(2020,1,15)
? ? # email : [ airflow example.com ],
? ? # email_on_failure : False,
? ? # email_on_retry : False,
? ? # retries : 1,
? ? # queue : bash_queue ,
? ? # pool : backfill ,
? ? # priority_weight : 10,
? ? # end_date : datetime(2016, 1, 1),
}
dag DAG(
? ? Airiflow_MC , default_args default_args, schedule_interval timedelta(seconds 30))
def read_sql(sqlfile):
? ? with io.open(sqlfile, encoding utf-8 , mode r ) as f:
? ? ? ? sql f.read()
? ? f.closed
? ? return sql
def get_time():
? ? print 当前时间是{} .format(time.time())
? ? return time.time()
def mc_job ():
? ? project odps.get_project() ?# 取到默认项目。
? ? instance odps.run_sql( select * from long_chinese; )
? ? print(instance.get_logview_address())
? ? instance.wait_for_success()
? ? with instance.open_reader() as reader:
? ? ? ? count reader.count
? ? print( 查询表数据条数 {} .format(count))
? ? for record in reader:
? ? ? ? print record
? ? return count
t1 PythonOperator (
? ? task_id get_time ,
? ? provide_context False ,
? ? python_callable get_time,
? ? dag dag )
t2 PythonOperator (
? ? task_id mc_job ,
? ? provide_context False ,
? ? python_callable mc_job ,
? ? dag dag )
t2.set_upstream(t1)
python Airiflow_MC.py
3.进行测试# print the list of active DAGs
airflow list_dags
# prints the list of tasks the tutorial dag_id
airflow list_tasks Airiflow_MC
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks Airiflow_MC --tree
#测试task
airflow test Airiflow_MC get_time 2010-01-16
airflow test Airiflow_MC mc_job 2010-01-16
4.运行调度任务登录到web界面点击按钮运行
5.查看任务运行结果1.点击view log
2.查看结果
大家如果对MaxCompute有更多咨询或者建议 欢迎扫码加入 MaxCompute开发者社区钉钉群 或点击链接 申请加入。
最近,DevOps的采用导致了企业计算的重大转变。除无服务器计算,动态配置和即付...
在TOP云(zuntop.com)科技租赁过服务器的站长都知道独立服务器在价格上比VPS主...
本文转载自网络,原文链接:https://mp.weixin.qq.com/s/vlOUg46B5bcmToX-fjavJQ...
9月17日,2020云栖大会上,阿里云正式发布工业大脑3.0。 阿里云智能资深产品专家...
中国最?好的一朵云飘进了华瑞银行。阿里云将进一步助力华瑞银行All in Cloud。 -...
定义 this是函数运行时自动生成的内部对象,即调用函数的那个对象。(不一定很准...
很长时间没有更新原创文章了,但是还一直在思考和沉淀当中,后面公众号会更频繁...
2020年对于云计算行业来说是突破性的一年,因为公共云供应商增加了收入,而疫情...
一、PostgreSQL行业位置 一 行业位置 首先我们看一看RDS PostgreSQL在整个行业当...
查看表结构,sbtest1有主键、k_1二级索引、i_c二级索引 CREATE TABLE `sbtest1` ...