本文档将介绍如何使用 Java 版 SDK 来提交一个作业,目的是统计一个日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数。
本作业是统计一个日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数。
该作业包含 3 个任务: split, count 和 merge:
DAG图例:
下载本例所需的数据: log-count-data.txt
将 log-count-data.txt 上传到:
oss://your-bucket/log-count/log-count-data.txt
本示例将采用 Java 来编写作业任务,使用 maven 来编译,推荐使用 IDEA:http://www.jetbrains.com/idea/download/ 选择 Community 版本(免费).
示例程序下载:java-log-count.zip
这是一个 maven 工程。
运行命令编译打包:
mvn package
即可在 target 得到下面 3 个 jar 包:
batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar
再将 3 个 jar 包,打成一个 tar.gz 压缩包,命令如下:
> cd target #进入 target 目录
> tar -czf worker.tar.gz *SNAPSHOT-*.jar #打包
运行以下命令,查看包的内容是否正确:
> tar -tvf worker.tar.gz
batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar
本例将 worke.tar.gz 上传到 OSS 的 your-bucket 中:
oss://your-bucket/log-count/worker.tar.gz
在 pom.xml 中增加以下 dependencies:
<dependencies>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-batchcompute</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>3.2.3</version>
</dependency>
</dependencies>
提交作业需要指定集群 ID 或者使用匿名集群参数。本例子使用匿名集群方式进行。匿名集群需要配置 2 个参数, 其中:
在 OSS 中创建存储 StdoutRedirectPath(程序输出结果)和 StderrRedirectPath(错误日志)的文件路径,本例中创建的路径为
oss://your-bucket/log-count/logs/
Java SDK 提交程序模板如下,程序中具体参数含义请参照 SDK 接口说明。
Demo.java:
/*
* IMAGE_ID:ECS 镜像,由上文所述获取
* INSTANCE_TYPE: 实例类型,由上文所述获取
* REGION_ID:提交作业的地域,此项需与上文 OSS 存储 worker 的bucket 地域一致
* ACCESS_KEY_ID: AccessKeyId 可以由上文所述获取
* ACCESS_KEY_SECRET: AccessKeySecret 可以由上文所述获取
* WORKER_PATH:由上文所述打包上传的 worker 的 OSS 存储路径
* LOG_PATH:错误反馈和 task 输出的存储路径,logs 文件需事先自行创建
*/
import com.aliyuncs.batchcompute.main.v20151111.*;
import com.aliyuncs.batchcompute.model.v20151111.*;
import com.aliyuncs.batchcompute.pojo.v20151111.*;
import com.aliyuncs.exceptions.ClientException;
import java.util.ArrayList;
import java.util.List;
public class Demo {
static String IMAGE_ID = "img-ubuntu";; //这里填写您的 ECS 镜像 ID
static String INSTANCE_TYPE = "ecs.sn1.medium"; //根据 region 填写合适的 InstanceType
static String REGION_ID = "cn-shenzhen"; //这里填写 region
static String ACCESS_KEY_ID = ""; //"your-AccessKeyId"; 这里填写您的 AccessKeyId
static String ACCESS_KEY_SECRET = ""; //"your-AccessKeySecret"; 这里填写您的 AccessKeySecret
static String WORKER_PATH = ""; //"oss://your-bucket/log-count/worker.tar.gz"; // 这里填写您上传的 worker.tar.gz 的 OSS 存储路径
static String LOG_PATH = ""; // "oss://your-bucket/log-count/logs/"; // 这里填写您创建的错误反馈和 task 输出的 OSS 存储路径
static String MOUNT_PATH = ""; // "oss://your-bucket/log-count/";
public static void main(String[] args){
/** 构造 BatchCompute 客户端 */
BatchCompute client = new BatchComputeClient(REGION_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
try{
/** 构造 Job 对象 */
JobDescription jobDescription = genJobDescription();
//创建 Job
CreateJobResponse response = client.createJob(jobDescription);
//创建成功后,返回 jobId
String jobId = response.getJobId();
System.out.println("Job created success, got jobId: "+jobId);
//查询 job 状态
GetJobResponse getJobResponse = client.getJob(jobId);
Job job = getJobResponse.getJob();
System.out.println("Job state:"+job.getState());
} catch (ClientException e) {
e.printStackTrace();
System.out.println("Job created failed, errorCode:"+ e.getErrCode()+", errorMessage:"+e.getErrMsg());
}
}
private static JobDescription genJobDescription(){
JobDescription jobDescription = new JobDescription();
jobDescription.setName("java-log-count");
jobDescription.setPriority(0);
jobDescription.setDescription("log-count demo");
jobDescription.setJobFailOnInstanceFail(true);
jobDescription.setType("DAG");
DAG taskDag = new DAG();
/** 添加 split task */
TaskDescription splitTask = genTaskDescription();
splitTask.setTaskName("split");
splitTask.setInstanceCount(1);
splitTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar");
taskDag.addTask(splitTask);
/** 添加 count task */
TaskDescription countTask = genTaskDescription();
countTask.setTaskName("count");
countTask.setInstanceCount(3);
countTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar");
taskDag.addTask(countTask);
/** 添加 merge task */
TaskDescription mergeTask = genTaskDescription();
mergeTask.setTaskName("merge");
mergeTask.setInstanceCount(1);
mergeTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar");
taskDag.addTask(mergeTask);
/** 添加 Task 依赖: split-->count-->merge */
List<String> taskNameTargets = new ArrayList();
taskNameTargets.add("merge");
taskDag.addDependencies("count", taskNameTargets);
List<String> taskNameTargets2 = new ArrayList();
taskNameTargets2.add("count");
taskDag.addDependencies("split", taskNameTargets2);
//dag
jobDescription.setDag(taskDag);
return jobDescription;
}
private static TaskDescription genTaskDescription(){
AutoCluster autoCluster = new AutoCluster();
autoCluster.setInstanceType(INSTANCE_TYPE);
autoCluster.setImageId(IMAGE_ID);
//autoCluster.setResourceType("OnDemand");
TaskDescription task = new TaskDescription();
//task.setTaskName("Find");
//如果使用 VPC,需要配置 cidrBlock, 请确保 IP 段不冲突
Configs configs = new Configs();
Networks networks = new Networks();
VPC vpc = new VPC();
vpc.setCidrBlock("192.168.0.0/16");
networks.setVpc(vpc);
configs.setNetworks(networks);
autoCluster.setConfigs(configs);
//打包上传的作业的 OSS 全路径
Parameters p = new Parameters();
Command cmd = new Command();
//cmd.setCommandLine("");
//打包上传的作业的 OSS 全路径
cmd.setPackagePath(WORKER_PATH);
p.setCommand(cmd);
//错误反馈存储路径
p.setStderrRedirectPath(LOG_PATH);
//最终结果输出存储路
p.setStdoutRedirectPath(LOG_PATH);
task.setParameters(p);
task.addInputMapping(MOUNT_PATH, "/home/input");
task.addOutputMapping("/home/output",MOUNT_PATH);
task.setAutoCluster(autoCluster);
//task.setClusterId(clusterId);
task.setTimeout(30000); /* 30000 秒*/
task.setInstanceCount(1); /** 使用 1 个实例来运行 */
return task;
}
}
正常输出样例:
Job created success, got jobId: job-01010100010192397211
Job state:Waiting
您可以用 SDK 中的 获取作业信息 方法获取作业状态:
//查询 job 状态
GetJobResponse getJobResponse = client.getJob(jobId);
Job job = getJobResponse.getJob();
System.out.println("Job state:"+job.getState());
Job 的 state 可能为:Waiting、Running、Finished、Failed、Stopped.
您可以登录 batchcompute 控制台 查看 job 状态。
Job 运行结束,您可以登录 OSS 控制台 查看your-bucket 这个 bucket 下面的这个文件:/log-count/merge_result.json。
内容应该如下:
{"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}
您也可以使用 OSS 的 SDK 来获取结果。
上篇文章介绍了对对碰游戏的理论部分和介绍了JLabel、JButton、JPanel、ImageIco...
如今SEO已经成为时尚词,大大小小的公司都在谈SEO,一些无知老板动不动就大谈网...
Linux下DHCP服务器配置: 1、DHCP服务器的架设,在安装虚拟机的时候网卡选择桥接...
是否想发现可在下一个项目中使用的非典型CSS设计?下面是我最喜欢的一些。 NES.cs...
Nginx是一款小巧而高效的Web服务器软件,可帮您在Alibaba Cloud Linux 2.1903 LT...
做网站对于许多个人站长和中小型企业做好的选择莫过于 服务器租用 了。 服务器租...
本文转载自网络,原文链接:https://mp.weixin.qq.com/s/vlOUg46B5bcmToX-fjavJQ...
6月23日,在2021阿里巴巴研发效能峰会上,由阿里云云效团队20位专家共同撰写的《...
ACNA 的概念 ? 阿里巴巴为大量各行各业的企业客户提供了基于阿里云服务的解决方...
通过设置全局标签,RAM子账号可以快速筛选出符合要求的已授权云资源。支持使用全...