通过消费组(ConsumerGroup)消费日志数据有显著优点,您无需关注日志服务的实现细节和消费者之间的负载均衡、Failover等,只需要专注于业务逻辑即可。
基本概念
概念 | 说明 |
---|---|
消费组 | 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个Logstore中的日志数据,消费者之间不会重复消费数据。每个Logstore最多创建30个消费组。 |
消费者 | 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。 |
- 每个Shard只会分配到一个消费者。
- 一个消费者可以同时拥有多个Shard。
新的消费者加入一个消费组,这个消费组下面的Shard从属关系会调整,以达到消费负载均衡的目的,但是仍遵循分配原则。
通过消费组消费可以保存Checkpoint,在程序故障恢复时能从断点继续消费,从而保证数据不会被重复消费。
操作步骤
消费组消费可以通过Java、Python及Go语言实现,以下操作步骤以Java为例。
查看消费组状态
- 登录日志服务控制台。
- 在Project列表区域,单击目标Project。
- 在 页签中,单击目标Logstore左侧的 。
- 单击目标ConsumerGroup,即可查看每个Shard消费数据的进度。
package test;
import java.util.ArrayList;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "";
static String project = "";
static String logstore = "";
static String accesskeyId = "";
static String accesskey = "";
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
//获取Logstore下的所有消费组,若消费组不存在,则长度为0。
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
//打印消费组的属性,包括名称、心跳超时时间、是否按序消费。
System.out.println("名称: " + c.getConsumerGroupName());
System.out.println("心跳超时时间: " + c.getTimeout());
System.out.println("按序消费: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
//时间返回精确到毫秒,类型为长整型。
System.out.println("最后一次消费数据的时间: " + cp.getUpdateTime());
System.out.println("消费者名称: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "尚未开始消费";
else{
//Unix时间戳,单位是秒,输出时请注意格式化。
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "非法,前一次消费时刻已经超出了Logstore中数据的生命周期";
else{
//internal server error
throw e;
}
}
}
System.out.println("消费进度: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
//do nothing
}
//Unix时间戳,单位是秒,输出时请注意格式化。
System.out.println("最后一条数据到达时刻: " + endPrg);
}
}
}
}
相关操作
- 异常诊断
建议您为消费者程序配置Log4j,将消费组内部遇到的异常信息打印出来,便于定位。log4j.properties典型配置:
log4j.rootLogger = info,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
配置Log4j后,执行消费者程序可以看到类似如下异常信息:[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159) com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
- 通过消费组消费从某个时间开始的日志数据
// consumerStartTimeInSeconds表示消费这个时间点之后的数据。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, int consumerStartTimeInSeconds); // position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, ConsumePosition position);
说明- 请您按照消费需求,使用不同的构造方法。
- 当服务端已保存Checkpoint,则开始消费位置以服务端保存的Checkpoint为准。
- 重置Checkpoint
public static void updateCheckpoint() throws Exception { Client client = new Client(host, accessId, accessKey); long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000; ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore)); for (Shard shard : response.GetShards()) { int shardId = shard.GetShardId(); String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor(); client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor); } }
RAM用户授权
使用RAM用户访问需要设置消费组相关的权限策略,授权步骤请参见授权RAM用户。
Action | Description | Resource |
---|---|---|
log:GetCursorOrData(GetCursor,PullLogs) | 根据时间获取游标(cursor)。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup | 在指定的Logstore上创建一个消费组。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ListConsumerGroup | 查询指定Logstore的所有消费组。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:UpdateCheckPoint | 更新指定消费组的某个Shard的Checkpoint。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat | 为指定消费者发送心跳到服务端。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:UpdateConsumerGroup | 修改指定消费组属性。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupUpdateCheckPoint | 获取指定消费组消费的某个或者所有Shard的Checkpoint。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}