当前位置:主页 > 查看内容

任务类型 - 批量计算

发布时间:2021-09-19 00:00| 位朋友查看

简介:1. 背景 ? 一个作业(Job)由一组任务(Task)及其依赖关系组成,每个任务可以有一个或多个执行实例(Instance)。具体详情看 名词解释 。目前的任务类型分为两种:并发任务和 DAG(Directed Acyclic Graph) 任务。 2. 任务概述 2.1 并发任务 作业中的一个任……

1. 背景

? 一个作业(Job)由一组任务(Task)及其依赖关系组成,每个任务可以有一个或多个执行实例(Instance)。具体详情看名词解释。目前的任务类型分为两种:并发任务和 DAG(Directed Acyclic Graph) 任务。

2. 任务概述

2.1 并发任务

作业中的一个任务可以指定在多个实例上运行程序,这些实例运行的任务程序都是一样的,但是可以处理不同的数据。

2.2 DAG任务

作业中的多个任务之间可以有 DAG 依赖关系。即前面的任务运行完成后, 后面的任务才开始运行。

3. 任务实现

这两种任务是在提交的 Job 中指定相关字段实现的,下面以 Python SDK 为例给出实现方式,代码的完整程序见快速开始。

3.1 并发任务实现

在提交的 Job 中,填写 InstanceCount 字段。指明任务需要的实例数。该字段就是实现任务的并发功能。

  1. from batchcompute.resources import (
  2. JobDescription, TaskDescription, DAG
  3. )
  4. # create my_task
  5. my_task = TaskDescription()
  6. my_task.InstanceCount = 3 #指定需要实例数:3台VM

如果并发任务需要处理不同片段的数据,这个时候在需要运行的任务程序中 使用环境变量: BATCH_COMPUTE_DAG_INSTANCE_ID(实例 ID)来区分,就可以处理不同片段的数据。下面的示例程序是快速开始的count代码,假设输入数据已经放在oss中。您需要下载oss的sdk

  1. import oss2 #oss sdk
  2. from conf import conf
  3. import os
  4. import json
  5. endpoint = os.environ.get('BATCH_COMPUTE_OSS_HOST') #OSS Host
  6. auth = oss2.Auth(conf['access_key_id'], conf['access_key_secret'])
  7. def download_file(oss_path, filename):
  8. (bucket, key) = parse_oss_path(oss_path)
  9. bucket_tool = oss2.Bucket(auth, endpoint, bucket)
  10. bucket_tool.get_object_to_file(key, filename)
  11. def upload_file(filename, oss_path):
  12. (bucket, key) = parse_oss_path(oss_path)
  13. bucket_tool = oss2.Bucket(auth, endpoint, bucket)
  14. bucket_tool.put_object_from_file(key,filename)
  15. def put_data(data, oss_path):
  16. (bucket, key) = parse_oss_path(oss_path)
  17. bucket_tool = oss2.Bucket(auth, endpoint, bucket)
  18. bucket_tool.put_object(key, data)
  19. def parse_oss_path(oss_path):
  20. s = oss_path[len('oss://'):]
  21. [bucket, key] = s.split('/',1)
  22. return (bucket,key)
  23. def main():
  24. # instance_id: should be start from 0
  25. instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']
  26. data_path = conf['data_path']
  27. split_results = 'split_results'
  28. filename = 'part_%s.txt' % instance_id
  29. pre = data_path[0: data_path.rfind('/')]
  30. print('download form: %s/%s/' % (pre, split_results))
  31. # 1. download a part
  32. download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)
  33. # 2. parse, calculate
  34. with open(filename) as f:
  35. txt = f.read()
  36. m = {
  37. 'INFO': 0,
  38. 'WARN': 0,
  39. 'ERROR': 0,
  40. 'DEBUG': 0
  41. }
  42. for k in m:
  43. m[k] = len(re.findall(k, txt))
  44. print(m)
  45. # 3. upload result to oss
  46. upload_to = '%s/count_results/%s.json' % (pre, instance_id )
  47. print('upload to %s' % upload_to)
  48. put_data(json.dumps(m), upload_to)

3.2 DAG任务实现

在提交的job中,填写 Dependencies 字段。指明任务之间的依赖关系。下面的图中,首先理清各个任务之间的依赖关系,count1 和 count2 是并行的任务,它们依赖 split 任务,merge任务依赖 count1 和 count2。

img

依据上面的依赖关系,在Job中可以这样描述:

  1. from batchcompute.resources import (
  2. JobDescription, TaskDescription, DAG, AutoCluster
  3. )
  4. job_desc = JobDescription()
  5. #以下省略task的描述内容
  6. split = TaskDescription()
  7. count1 = TaskDescription()
  8. count2 = TaskDescription()
  9. merge = TaskDescription()
  10. task_dag = DAG()
  11. task_dag.add_task(task_name="split", task=split)
  12. task_dag.add_task(task_name="count1", task=count1)
  13. task_dag.add_task(task_name="count2", task=count2)
  14. task_dag.add_task(task_name="merge", task=merge)
  15. task_dag.Dependencies = {
  16. 'split': ['count1', 'count2'],
  17. 'count1': ['merge'],
  18. 'count2': ['merge']
  19. }
  20. job_desc.DAG = task_dag

整个作业的任务执行顺序是:

  • split 运行完成后,count1 和 count2 同时开始运行,count1 和 count2 都完成后,merge 才开始运行。
  • merge 运行完成,整个作业结束。

本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!
上一篇:配置邮箱解析_云解析服务 DNS_快速入门 下一篇:没有了

推荐图文


随机推荐