本文介绍了如何使用任务步骤的等待回调(waitForCallback)模式集成MNS主题,并发布消息到主题。MNS主题接收到消息后,调用工作流ReportTaskSucceeded或ReportTaskFailed
API回调任务状态。
框架原理
应用部署后执行流程如下:
- 执行工作流,任务步骤发布消息到MNS主题。任务步骤的
TaskToken
会被放入消息体一起发送到主题。
- 工作流任务步骤暂停执行,等待任务回调。
- MNS主题接收到消息后,将消息和
TaskToken
通过HTTP推送发送到函数计算FC的函数HTTP触发器,触发函数执行。
- 函数计算函数最终获取到
TaskToken
,并调用ReportTaskSucceeded来报告任务状态。
- 流程继续执行。
部署应用
- 登录Serverless工作流控制台。
- 在流程页面,单击创建流程。
- 在创建流程页面,选择模板,单击下一步。
- 在创建应用页面,配置相关信息,创建模板对应的应用,并单击部署。
- 应用名称:自定义参数,同一账号下必须唯一。
- TopicName:自定义参数,如果对应MNS主题不存在会自动创建。
单击
部署后,会显示应用下创建的所有资源。
- 执行工作流。
执行以下命令。
{
"messageBody": "hello world"
}
执行成功后,您可以看到执行结果的状态。
应用代码
- 编排MNS主题的工作流。
将任务步骤回调的TaskToken
封装在消息的MessageBody
中,用于后续的回调。outputMappings
中读取ReportTaskSucceeded设置的output
。
version: v1
type: flow
steps:
- type: task
name: mns-topic-task
resourceArn: acs:mns:::/topics/<topic>/messages
pattern: waitForCallback
inputMappings:
- target: messageBody
source: $input.messageBody
- target: taskToken
source: $context.task.token
outputMappings:
- target: status
source: $local.status
serviceParams:
MessageBody: $
- 回调任务步骤的FC函数。
读取MessageBody
中封装的TaskToken
,回调任务状态设置output
为{"status":"success"}
。
def handler(environ, start_response):
# Get request body
try:
request_body_size = int(environ.get('CONTENT_LENGTH',
0))
except ValueError:
request_body_size = 0
request_body =
environ['wsgi.input'].read(request_body_size)
print('Request body:
{}'.format(request_body))
body = json.loads(request_body)
message_body_str =
body['Message']
# Read MessageBody and TaskToken from
message body
message_body =
json.loads(message_body_str)
task_token =
message_body['taskToken']
ori_message_body =
message_body['messageBody']
print('Task token: {}\norigin message
body: {}'.format(task_token, ori_message_body))
# Init fnf client use sts token
context = environ['fc.context']
creds = context.credentials
sts_creds =
StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
fnf_client =
AcsClient(credential=sts_creds, region_id=context.region)
# Report task succeeded to serverless
workflow
req =
ReportTaskSucceededRequest()
req.set_TaskToken(task_token)
req.set_Output('{"status":
"success"}')
resp =
fnf_client.do_action_with_exception(req)
print('Report task response:
{}'.format(resp))
# Response to http request
status = '200 OK'
response_headers = [('Content-type',
'text/plain')]
start_response(status,
response_headers)
return [b'OK']