通过内置的投递任务模板,快速创建投递任务。本文介绍创建投递任务的操作方法。

步骤一:初始化日志服务Client

LogClient是日志服务的Python客户端,用于管理Project、Logstore等日志服务资源。使用Python SDK发起日志服务请求,您需要初始化一个Client实例。示例代码如下所示:

# Setup basic client
# !pip install -U matplotlib
import time
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import aliyun.log as sls

# 日志服务的服务入口。更多信息,请参见服务入口。
endpoint = "cn-beijing.log.aliyuncs.com"

# 阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见访问密钥。
accessId = "YOUR_ACCESS_ID"
accessKey= "YOUR_ACCESS_KEY"
# Project名称。
project  = "YOUR_SLS_PROJECT"
# MetricStore名称。
metricstore = "YOUR_SLS_METRICSTORE"
# 保存巡检结果的Logstore。
sink_logstore = 'YOUR_SLS_LOGSTORE_FOR_RESULTS_WRITE' 
# 设置任务名称。
task_name = "YOUR_TASK_NAME" 
# 创建LogClient。
client = sls.LogClient(endpoint, accessId, accessKey)

步骤二:查询当前Logstore的投递任务及配置

在创建投递任务前,查询已存在的投递任务及配置。示例代码如下所示:

# 查询投递任务配置。

project  = "YOUR_SLS_PROJECT"
logstore = "YOUR_SLS_LOGSTORE"

ret = client.list_shipper(project, logstore)
for shipper_name in ret.get_body()['shipper']:
    ret = client.get_shipper(project, logstore, shipper_name)

    print("------------------------------------------")
    print(shipper_name)
    print(ret.get_body())

步骤三:创建投递任务

  • 创建MaxCompute投递任务
    示例代码如下所示:
    # 创建投递任务。
    from aliyun.log.logexception import LogException
    
    project  = "YOUR_SLS_PROJECT"
    logstore = "YOUR_SLS_LOGSTORE"
    
    # 投递参数的具体配置。更多信息,请参见通过日志服务投递日志到MaxCompute。
    shipper_config = {
            'shipperName': 'test_ship22',
            'targetType': 'odps',
            'targetConfiguration': {
                'bufferInterval': 1800,
                'enable': True,
                'fields': ['__time__', '__source__', '__topic__', 'content'],
                'odpsEndpoint': 'http://odps-ext.aliyun-inc.com/api',
                'odpsProject': 'TS_DL',
                'odpsTable': 'test_odps',
                'partitionColumn': ['__time__'],
                'partitionTimeFormat': 'yyyy_MM_dd_HH_mm'
            }
    }
    
    try:
        client.create_shipper(project, logstore, shipper_config)
    except LogException as ex:
        if 'shipperName already exists' in ex.get_error_message():
            # create index if index not exists
            ret = client.update_shipper(project, logstore, shipper_config)
        else:
            raise ex
  • 创建OSS投递任务
    示例代码如下:
    # 创建投递任务。
    # Create Logstore Shipper
    from aliyun.log.logexception import LogException
    
    project  = "YOUR_SLS_PROJECT"
    logstore = "YOUR_SLS_LOGSTORE"
    
    # 投递参数的具体配置。更多信息,请参见将日志服务数据投递到OSS。
    shipper_config = {
            'shipperName': 'to-oss',
            'targetConfiguration': {
                 'bufferInterval': 300,
                 'bufferSize': 256,
                 'compressType': 'snappy',
                 'enable': True,
                 'ossBucket': 'YOUR_oss-bucket',
                 'ossPrefix': 'prefix',
                 'pathFormat': '%Y/%m/%d/%H/%M',
                 'roleArn': 'acs:ram::YOUR_ALIYUN_UID:role/aliyunlogdefaultrole',
                 'storage': {'detail': {'enableTag': False}, 'format': 'json'}
             },
            'targetType': 'oss'}
    
    # 用户角色标识(ARN)。更多信息,请参见常见问题。
    try:
        client.create_shipper(project, logstore, shipper_config)
    except LogException as ex:
        if 'shipperName already exists' in ex.get_error_message():
            # create index if index not exists
            ret = client.update_shipper(project, logstore, shipper_config)
        else:
            raise ex