当前位置:主页 > 查看内容

CDN 流媒体服务实时分析 Elasticsearch 实践—Elastic Stack 实

发布时间:2021-05-19 00:00| 位朋友查看

简介:作者:吴斌 发挥 Elastic Stack 在日志和实时数据分析计算领域的一些优势,对流媒体服务这样规模较大、实时性要求偏高,且分析、业务探索流程要求灵活的业务是一个比较百搭的选择。 数据逻辑架构如下 整体架构比较直观简单。我们省去了业务组建和存储层可能……

作者:吴斌

发挥 Elastic Stack 在日志和实时数据分析计算领域的一些优势,对流媒体服务这样规模较大、实时性要求偏高,且分析、业务探索流程要求灵活的业务是一个比较百搭的选择。

数据逻辑架构如下

整体架构比较直观简单。我们省去了业务组建和存储层可能会用到的其他引擎,把目光主要集中在 Elasticsearch 上。

日志采集

日志的收集在正式进入数据管道前,可以选择落地或者直接吐到消息队列。这里采集的内容也主要分成 2 大部分

CDN/网络访问日志业务打点数据

业务打点的数据可以根据需求采集,实时部分主要会聚焦在,例如卡顿等这样的用户体验指标。网络访问的日志通常比较通用,这里我们也先给出一个例子,相信大家看上去会比较熟悉。

那么根据这里样例的数据,Elasticsearch 可以轻松的利用内置的 processor 和聚合功能做快速的分析,后面我们会举例说明。

{
 "receiveTimestamp": "2021-04-28T14:30:17.90993285Z",
 "spanId": "blahblah",
 "trace": "blah/f5c7578feaf277dd9a8d96",
 "@timestamp": "2021-04-28T14:30:17.549287Z",
 "logName": "logs/requests",
 "jsonPayload": {
 "@type": "loadbalancing.type.LoadBalancerLogEntry",
 "latencySeconds": "0.001749s",
 "statusDetails": "response_from_cache",
 "cacheIdCityCode": "ABC",
 "cacheId": "ABC-abcabc123"
 "httpRequest": {
 "remoteIp": "10.0.0.1",
 "remoteIpIsp": {
 "ip": "10.0.0.2",
 "organization_name": "China Telecom",
 "asn": 8346,
 "network": "10.0.0.0/15"
 "requestMethod": "GET",
 "responseSize": "125621",
 "userAgent": "Mozilla/5.0 (Linux; Android 10) Bindiego/7.1-1840",
 "frontendSrtt": 0.124,
 "cacheLookup": true,
 "geo": {
 "continent_name": "Asia",
 "country_iso_code": "CN",
 "country_name": "China",
 "location": {
 "lon": 123,
 "lat": 321
 "backendLatency": 0.001749,
 "requestUrl": "http://bindiego.com/vid/bindigo.m4s",
 "requestDomain": "bindiego.com",
 "cacheHit": true,
 "requestSize": "671",
 "requestProtocol": "http",
 "user_agent": {
 "original": "Mozilla/5.0 (Linux; Android 10) Bindiego/7.1-1840",
 "os": {
 "name": "Android",
 "version": "10",
 "full": "Android 10"
 "name": "Android",
 "device": {
 "name": "Generic Smartphone"
 "version": "10"
 "status": 200,
 "resourceType": "m4s"
 }

这里就是最终导入到 Elasticsearch 里可分析的网络性能数据。针对这个数据,我们分别对它经过的管道和处理做一个简单快速的剖析。

消息队列

日志采集器会直接把数据打到消息队列,这里主要起到一个抗反压缓冲的作用。有些队列有很多附加的功能,例如存储和窗口计算,这里我们只使用最单纯的功能。因为后面我们选取了分布式计算引擎来做这这部分。

分布式计算引擎

分布式计算引擎,其实在整体实时数据分析业务里,扮演的着实是非常重要的角色。例如实时指标的窗口计算,迟报数据的修正等等。但在我们这个简单的场景下为了后面在 Elasticsearch 内更方便快捷的分析、过滤数据。

我们这里主要做了 ETL 和补全。例如把请求资源的域和资源类型提取出来,还有 CDN 缓存节点的区域代码等等。但是例如 IP 地址地理位置、用户设备类型和运营商(ISP)的反查,方便起见,我们利用了 Elasticsearch Ingest 节点预置的 Pipeline 去做。

这里要注意的就是,如果你的集群配置是全角色的节点,会对数据节点的性能有影响。建议使用独立的 ingest node 去做,且如果是在 K8S 上部署的话,还可以弹性扩容这组 nodeSet。

下面是 Ingest 节点配置举例

完整代码戳这里:https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/index-gclb-pipeline.json
{
 "description": "IP user agent lookup",
 "processors": [
 "user_agent" : {
 "field" : "httpRequest.userAgent",
 "target_field" : "httpRequest.user_agent",
 "ignore_missing": true
 "geoip" : {
 "field" : "httpRequest.remoteIp",
 "target_field" : "httpRequest.geo",
 "ignore_missing": true
 "geoip" : {
 "field" : "httpRequest.remoteIp",
 "target_field" : "httpRequest.remoteIpIsp",
 "database_file" : "GeoLite2-ASN.mmdb",
 "ignore_missing": true
}
数据安全

数据安全这块顺带提一下,现在 Elasticsearch 的认证、授权都可以在 Basic License 里使用了,非常方便。这里简单提一下通讯这块,很多小伙伴用的是自签的证书。这个问题不大,经常被问到在使用 RestClient 开发的时候如何绕过去(例如在写计算引擎最后入库的时候)。其实方法也很简单,这里就给大家上个代码片段说明看下.

配置:https://github.com/elasticsearch-cn/elastic-on-gke#option-2-regional-tcp-lb

完整代码:https://github.com/cloudymoma/raycom/blob/streaming/src/main/java/bindiego/io/ElasticsearchIO.java#L273-L296

try {
 SSLContext context = SSLContext.getInstance("TLS");
 context.init(null, new TrustManager[] {
 new X509TrustManager() {
 public void checkClientTrusted(X509Certificate[] chain, String authType) {}
 public void checkServerTrusted(X509Certificate[] chain, String authType) {}
 public X509Certificate[] getAcceptedIssuers() { return null; }
 }, null);
 httpAsyncClientBuilder.setSSLContext(context)
 .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
} catch (NoSuchAlgorithmException ex) {
 logger.error("Error when setup dummy SSLContext", ex);
} catch (KeyManagementException ex) {
 logger.error("Error when setup dummy SSLContext", ex);
} catch (Exception ex) {
 logger.error("Error when setup dummy SSLContext", ex);
}
索引管理

索引生命周期管理 Elasticsearch 也提供了非常便利的工具。

生命周期配置,这里应该根据业务需求和节点规模综合考量

{
 "policy": {
 "phases": {
 "hot": {
 "actions": {
 "rollover": {
 "max_size": "20GB",
 "max_docs": 20000000,
 "max_age": "7d"
 "delete": {
 "min_age": "30d",
 "actions": {
 "delete": {}
}

索引模版

完整版:https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/index-gclb-template.json

模版为每次生成的索引应用相同的配置,且指定了生命周期的政策文件和注入别名。

{
 "index_patterns": [
 "bindiego*"
 "order": 999,
 "settings": {
 "number_of_shards": 2,
 "number_of_replicas": 1,
 "final_pipeline": "bindiego",
 "index.lifecycle.name": "bindiego-policy",
 "index.lifecycle.rollover_alias": "bindiego-ingest"
 "mappings": {

最后我们配置了脚本一次性把上述配置应用,且在 Kibana 里为我们建立好查询的 index pattern

详细戳这里:https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/init.sh数据面板

这里虽然是个人弱项,但是借助 Kibana 强大的可视化功能,可以根据第一部分整理出来的数据绘制实时面板。

完整可复用面板:https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/gclb_dashboard.ndjson

部分截图:https://github.com/cloudymoma/raycom/tree/gcp-lb-log#dashboards-in-kibana

下面我举一些可能常被忽视的好用功能给大家打个样。

IP 反查出的 Geo 和 ISP 信息

通过这些信息,可以快速反映出各个运营商网络的情况,甚至一些盗链的线索初判断。

Vega 在 Kibana 里绘制数据

当我们觉得 Kibana 自身图表不够丰富的时候,可以借助 Vega。上面这个图就展示了来自不同地区的用户,分别命中 CDN 缓存点的流量分配。数据通过用 Elasticsearch 的 Composite Aggregation 提取。

Kibana TSVB

这个是我个人最喜欢的绘图方法了,可以非常灵活的对指标进行计算。下面这两个图表就展示过滤出直播业务的缓存命中、请求返回和缓存填充的数据量这些信息。

总结

由于业务数据的敏感性,这里就不列举细节了。但数据管道和治理,都依旧遵循同样的原则。整体数据管道的选型也非常灵活,采集部分即可以是 Beats 生态中的产品,也可以是自己开发的 agent。队列常用的有 Kafka 或者云上托管服务。分布式计算层因为业务比较简单,我比较推荐使用 Apache Beam,这样执行引擎可以在比如 Flink、Spark Streaming 和任何 Beam 支持的平台上相对灵活的切换。

今天我们给出的案例是一个非常简单,且可以快速复用的开源项目。

大家有任何需求和疑问也欢迎到社区一起交流、学习。


本文转自网络,原文链接:https://developer.aliyun.com/article/784164
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文


随机推荐