通过消费组(ConsumerGroup)消费日志数据有显著优点,您无需关注日志服务的实现细节和消费者之间的负载均衡、Failover等,只需要专注于业务逻辑即可。

基本概念

概念 说明
消费组 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个Logstore中的日志数据,消费者之间不会重复消费数据。每个Logstore最多创建30个消费组。
消费者 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。
在日志服务中,一个Logstore下面会有多个Shard,通过消费组消费日志数据就是将Shard分配给一个消费组下面的消费者,分配方式遵循以下原则:
  • 每个Shard只会分配到一个消费者。
  • 一个消费者可以同时拥有多个Shard。

新的消费者加入一个消费组,这个消费组下面的Shard从属关系会调整,以达到消费负载均衡的目的,但是仍遵循分配原则。

通过消费组消费可以保存Checkpoint,在程序故障恢复时能从断点继续消费,从而保证数据不会被重复消费。

操作步骤

消费组消费可以通过Java、Python及Go语言实现,以下操作步骤以Java为例。

  1. 添加maven依赖。
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.33</version>
    </dependency>
  2. 创建main.java文件。
    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // 日志服务域名,请您根据实际情况填写。
        private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        // 日志服务项目名称,请您根据实际情况填写。
        private static String sProject = "ali-cn-hangzhou-sls-admin";
        // 日志库名称,请您根据实际情况填写。
        private static String sLogstore = "sls_operation_log";
        // 消费组名称,请您根据实际情况填写。
        private static String sConsumerGroup = "consumerGroupX";
        // 消费数据的用户AccessKey和AccessKey ID信息,请您根据实际情况填写。
        private static String sAccessKeyId = "";
        private static String sAccessKey = "";
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同,不同的消费者名称在多台机器上启动多个进程,来均衡消费一个Logstore,此时消费者名称可以使用机器IP地址来区分。maxFetchLogGroupSize是每次从服务端获取的LogGroup最大数目,使用默认值即可,如有调整请注意取值范围(0,1000]。
            LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            //Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            //调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
            worker.shutdown();
            //ClientWorker运行过程中会生成多个异步的任务,Shutdown完成后请等待还在执行的任务安全退出,建议sleep配置为30秒。
            Thread.sleep(30 * 1000);
        }
    }
  3. 创建SampleLogHubProcessor.java文件。
    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // 记录上次持久化Checkpoint的时间。
        private long mLastCheckTime = 0;
    
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
        public String process(List<LogGroupData> logGroups,
                              ILogHubCheckPointTracker checkPointTracker) {
            // 打印已获取的数据。
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup flg = logGroup.GetFastLogGroup();
                System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                        flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
                System.out.println("Tags");
                for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                    FastLogTag logtag = flg.getLogTags(tagIdx);
                    System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
                }
                for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                    FastLog log = flg.getLogs(lIdx);
                    System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                        FastLogContent content = log.getContents(cIdx);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
            if (curTime - mLastCheckTime > 30 * 1000) {
                try {
                    //参数为true表示立即将Checkpoint更新到服务端;false表示将Checkpoint缓存在本地,默认间隔60秒会将Checkpoint更新到服务端。
                    checkPointTracker.saveCheckPoint(true);
                } catch (LogHubCheckPointException e) {
                    e.printStackTrace();
                }
                mLastCheckTime = curTime;
            }
            return null;
        }
    
        // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            //将Checkpoint立即保存到服务端。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // 生成一个消费实例。
            return new SampleLogHubProcessor();
        }
    }
    更多信息,请参见JavaPythonGo

查看消费组状态

  1. 登录日志服务控制台
  2. Project列表区域,单击目标Project。
  3. 日志存储 > 日志库页签中,单击目标Logstore左侧的展开节点 > 数据消费
  4. 单击目标ConsumerGroup,即可查看每个Shard消费数据的进度。
您也可以通过API或SDK查看消费进度,以Java SDK为例,代码如下:
package test;
import java.util.ArrayList;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
    static String endpoint = "";
    static String project = "";
    static String logstore = "";
    static String accesskeyId = "";
    static String accesskey = "";
    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accesskeyId, accesskey);
        //获取Logstore下的所有消费组,若消费组不存在,则长度为0。
        List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
        for(ConsumerGroup c: consumerGroups){
            //打印消费组的属性,包括名称、心跳超时时间、是否按序消费。
            System.out.println("名称: " + c.getConsumerGroupName());
            System.out.println("心跳超时时间: " + c.getTimeout());
            System.out.println("按序消费: " + c.isInOrder());
            for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                System.out.println("shard: " + cp.getShard());
                //时间返回精确到毫秒,类型为长整型。
                System.out.println("最后一次消费数据的时间: " + cp.getUpdateTime());
                System.out.println("消费者名称: " + cp.getConsumer());
                String consumerPrg = "";
                if(cp.getCheckPoint().isEmpty())
                    consumerPrg = "尚未开始消费";
                else{
                    //Unix时间戳,单位是秒,输出时请注意格式化。
                    try{
                        int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                        consumerPrg = "" + prg;
                    }
                    catch(LogException e){
                        if(e.GetErrorCode() == "InvalidCursor")
                            consumerPrg = "非法,前一次消费时刻已经超出了Logstore中数据的生命周期";
                        else{
                            //internal server error
                            throw e;
                        }
                    }
                }
                System.out.println("消费进度: " + consumerPrg);
                String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                int endPrg = 0;
                try{
                    endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                }
                catch(LogException e){
                    //do nothing
                }
                //Unix时间戳,单位是秒,输出时请注意格式化。
                System.out.println("最后一条数据到达时刻: " + endPrg);
            }
        }
    }
}

相关操作

  • 异常诊断
    建议您为消费者程序配置Log4j,将消费组内部遇到的异常信息打印出来,便于定位。log4j.properties典型配置:
    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
    配置Log4j后,执行消费者程序可以看到类似如下异常信息:
    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • 通过消费组消费从某个时间开始的日志数据
    // consumerStartTimeInSeconds表示消费这个时间点之后的数据。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    
    // position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    说明
    • 请您按照消费需求,使用不同的构造方法。
    • 当服务端已保存Checkpoint,则开始消费位置以服务端保存的Checkpoint为准。
  • 重置Checkpoint
    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

RAM用户授权

使用RAM用户访问需要设置消费组相关的权限策略,授权步骤请参见授权RAM用户

授权的Action如下:
Action Description Resource
log:GetCursorOrData(GetCursorPullLogs 根据时间获取游标(cursor)。 acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup 在指定的Logstore上创建一个消费组。 acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup 查询指定Logstore的所有消费组。 acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:UpdateCheckPoint 更新指定消费组的某个Shard的Checkpoint。 acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat 为指定消费者发送心跳到服务端。 acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:UpdateConsumerGroup 修改指定消费组属性。 acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupUpdateCheckPoint 获取指定消费组消费的某个或者所有Shard的Checkpoint。 acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
例如,Project所在地域为cn-hangzhou,Project名称为project-test,消费的Logstore名称为logstore-test,Project的所属阿里云账号ID为174649****602745,消费组名称为consumergroup-test,则需要为RAM用户设置以下权限。
{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "log:GetCursorOrData"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:CreateConsumerGroup",
        "log:ListConsumerGroup"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:ConsumerGroupUpdateCheckPoint",
        "log:ConsumerGroupHeartBeat",
        "log:UpdateConsumerGroup",
        "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
    }
  ]
}