本文介绍使用Python SDK的详细流程,包括环境准备、SDK获取和安装和快速使用三部分。
环境准备
- 要使用阿里云Python SDK,您需要一个云账号以及一对
AccessKey ID
和AccessKey Secret
。请在阿里云控制台中的AccessKey管理页面上创建和查看您的AccessKey,或者联系您的系统管理员。 - 要使用阿里云SDK访问某个产品的API,您需要事先在阿里云控制台 中开通这个产品。
SDK获取和安装
使用pip安装(推荐)
pip install aliyun-python-sdk-core # 安装阿里云SDK核心库
pip install aliyun-python-sdk-fnf # 安装Serverless工作流SDK
快速使用
下文将以创建一个流程,发起一次执行并获取执行详情为例展示如何使用Python SDK调用Serverless工作流服务。
请求方式
# encoding: utf-8
import time
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkfnf.request.v20190315 import CreateFlowRequest
from aliyunsdkfnf.request.v20190315 import StartExecutionRequest
from aliyunsdkfnf.request.v20190315 import DescribeExecutionRequest
from aliyunsdkfnf.request.v20190315 import GetExecutionHistoryRequest
flow_definition_type = "FDL"
flow_name = "xxx"
flow_definition = "xxx"
flow_description = "some descriptions"
role_arn = "acs:ram::${Your_Account_ID}:${Your_Role}"
execution_name = "xxx"
def create_flow(fnf_cli):
request = CreateFlowRequest.CreateFlowRequest()
request.set_Type(flow_definition_type)
request.set_Description(flow_description)
request.set_Definition(flow_definition)
request.set_RoleArn(role_arn)
request.set_Name(flow_name)
return fnf_cli.do_action_with_exception(request)
def start_execution(fnf_cli):
request = StartExecutionRequest.StartExecutionRequest()
request.set_FlowName(flow_name)
request.set_ExecutionName(execution_name)
return fnf_cli.do_action_with_exception(request)
def describe_execution(fnf_cli):
request = DescribeExecutionRequest.DescribeExecutionRequest()
request.set_FlowName(flow_name)
request.set_ExecutionName(execution_name)
return fnf_cli.do_action_with_exception(request)
def get_execution_history(fnf_cli):
request = GetExecutionHistoryRequest.GetExecutionHistoryRequest()
request.set_FlowName(flow_name)
request.set_ExecutionName(execution_name)
return fnf_cli.do_action_with_exception(request)
创建客户端并利用上述函数发起一系列调用。
def main():
# 创建AcsClient实例
client = AcsClient(
"<your-access-key-id>",
"<your-access-key-secret>",
"<your-region-id>"
)
try:
create_resp = create_flow(client)
print(create_resp)
start_resp = start_execution(client)
print(start_resp)
time.sleep(1)
desc_resp = describe_execution(client)
print(desc_resp)
get_resp = get_execution_history(client)
print(get_resp)
except ServerException as e:
print(e.get_request_id())
if __name__ == '__main__':
main()