当前位置:主页 > 查看内容

Elasticsearch 智能巡检开发设计实践— Elastic Stack 实战手册

发布时间:2021-05-19 00:00| 位朋友查看

简介:作者:张妙成 项目背景 PaaS 下管理了大量集群,监控和告警能快速的让开发维护人员,知道系统已经发生故障,并且辅助高效排障。 但是无法提前预知集群的健康状况,开发人员和维护人员均无法在故障前及时作出调整。为了帮助用户及时的知道集群的健康状态,更……

作者:张妙成

项目背景

PaaS 下管理了大量集群,监控和告警能快速的让开发维护人员,知道系统已经发生故障,并且辅助高效排障。

但是无法提前预知集群的健康状况,开发人员和维护人员均无法在故障前及时作出调整。为了帮助用户及时的知道集群的健康状态,更好使用 Elasticsearch 集群,可以定期对集群进行指标检查并给出相应报告。巡检作业及时发现集群的健康问题,集群的配置是否合理,提前主动发现问题,能有效保证集群的稳定性、健状性,从而减少业务中断时间保证服务质量

为了解决集群健康状态提前预知困难的问题,可以通过抽取一些指标,进行定时检查达到健康诊断的目的。

巡检主要是对集群的各个指标检查,给出一份全方位的报告,并提供一定的推荐解决、优化方案。如阿里的 EYOU 平台阿里云 Elasticsearch 智能诊断系统)会系统的在 Elasticsearch 公有云进行各个指标的检查,并给出相应报告,极大的减小了风险,降低了维护成本。

智能管理系统不是一个独立的检查系统,而是一个与其他系统相结合的闭环系统,独立的巡检模块对各项指标进行检查分析,将结果通过 PaaS 系统展示给用户,并在 PaaS 中给予入口,用以帮助用户手动再次触发检查,增强实时性,提高用户体验。

本文将介绍智能巡检系统在整个 Elasticsearch 相关系统中的位置与意义,并从指标分析选取、异常标准的角度,主要阐述智能巡检系统的设计与实现。

巡检系统的结构

整个应用的框架如上:

Elasticsearch 集群在 K8S 环境中(实际生产大多是 K8S 环境与物理机、虚拟机环境共存,这里简化成最终要达到的统一环境),由 PaaS 平台进行统一管理。PaaS 的信息数据主要是与 DB 交互(PaaS 是与 DB 的唯一交互入口),用户主要与 PaaS平台进行交互。智能巡检系统信息收集模块(一组 Python Job)主要是 K8S 环境中的 Elasticsearch cluster、宿主机进行交互,数据报告信息通过 PaaS 平台存入 DB。监控使用 VictoriaMetrics( Prometheus 的高可用方案)作为存储,grafana 作为前端展示页面。监控可以配置 Elasticsearch 各项指标,其中与智能巡检相关是巡检异常数量的监控面板,用来给 OPS 观察巡检亚健康集群异常点的修复(优化)情况。PaaS 提供入口手动触发再次检查。指标选取简介

巡检的指标、异常阈值与告警配置的主要区别是,检的指标项会更加关心可能引发故障的某些现象和配置,参考阈值相对告警配置会更加宽松。巡检主要是通过指标的采集分析,得出一份相对全面的报告和推荐解决方案。为了报告的全面性与分析的准确性,巡检的指标项会与告警配置有一定相似或重复。

告警与巡检需要解决的问题不同,告警的目的是将异常指标恢复到正常状态,响应的实时性要求较高,而巡检的目的是预防故障、消除隐患、优化集群性能,以报告的形式推到平台和用户,不需要用户主动响应,只需解决问题后重新触发巡检。为推进优化,可将巡检报告中非健康指标配置成监控面板、告警。

集群健康程度可以从几个方面表现:cluster 层面、node 层面、shard 层面、index 层面、jvm 层面、threadpool 层面。如下为参考指标:

模块指标项异常阈值(参考)cluster 层面1.cluster status 2.pengding_task3.cpu_util(极差)4.query(极差)5.一次bulk请求的数量1.health为red、yellow2. pending_task数量count 100 3.cpu_util(max) - cpu_util(min) 50%4.query(max)/query(min) 25.indexing_total(stacked) /s 1000且indexing_total / thread_pool_write_completed 100node 层面1.uptime2.free_disk3.cpu_util4.node上shard数量1.date_now - uptime 1h2.free_disk 30%3.cpu_util 90%4.count(shard) = 0shard 层面1.number2.size of per shard1.每GB的heap超过20个shards2.搜索类集群(tag)单个shard_size 20g,日志类集群单个 shard_size 50gindex 层面1.replica2.dynamic mapping3.refresh_interval4.indices.refresh.total5.max_result_window1.存在index无replica2.dynamic != false dynamic != strict3.refresh_interval = -14.refresh_count 80/min5.max_result_window 10000jvm 层面1.jvm heap使用率2.jdk version一致性3.heap segment memory4.Full GC1.heap_util 90%2.各个节点 jdk 版本出现不一致3.segment memory占用heap heap_size的20% 4.出现full gcthreadpool 层面1.bulk reject数量2.search reject数量1.bulk rejected 0 2.search rejected 0

Elasticsearch 功能强大、使用方便,也就意味着对用户来说有很多的默认设置,用户使用的自由度很高,也就意味着开放的能力丰富,用户的使用对集群健康程度有着很大的影响。

所以指标选取需要从两个角度,一是现有的现象指标,二是常用不合理的配置指标。接下来对选取的指标进行简要逐一分析。

指标分析cluster 层面指标分析

cluster status

集群健康状态,检测到集群状态非 green,则说明巡检异常结果未处理,或者突发情况导致,此时巡检的意义是快速的给出推荐解决方案,让运维、集群 owner(开发)能够有处理的方案,并非一味依赖告警等待运维处理,最大限度的减少异常带来的影响;

非 green 状态下,首先检查的是节点个数是否符合预期,节点数量正常情况下,通常通过explain API进行分析。

GET /_cluster/allocation/explain
 "index":"index_name",
 "shard": 1,
 "primary":false
}

分片 allocation 是通过分配器和决策器来决定的,explain API 通过决策器的信息来体现unassigned shard 的异常原因,allocation 原理图如下:

详细流程图链接:https://www.processon.com/view/link/5f43b268e401fd5f24852544

该部分只要非 green 状态即为异常,智能巡检系统会分析出异常原因,并给出推荐解决方案到最终的报告中,如果是出现 red 场景或者 OOM 导致掉节点场景,则会由告警平台即时通知到用户和 OPS,用户和 OPS 可以通过在 PAAS 平台记录的报告,快速查看状态异常的分析;由于巡检系统非实时,可以手动触发智能巡检系统,快速得到最新分析报告与推荐解决方案。

推荐解决方案由两部分组成,

一些特定场景分析后的经验建议该集群所有unassigned shard通过explain API的查到的结果集。

示例代码见示例代码:https://www.teambition.com/project/601f63c9997fc9a15fb8e683/app/5eba5fba6a92214d420a3219/workspaces/601f659743c5e10046459556/docs/606285c1eaa1190001e688b1#608588d89645b900464132e3

pengding_task

pending_task 反应了 master 节点尚未执行的集群级别的更改任务(例如:创建索引,更新映射,分配分片)的列表。pending_task的任务是分级别的(优先级排序:IMMEDIATE URGENT HIGH NORMAL LOW LANGUID),只有当上一级别的任务执行完毕后,才会执行下一级别的任务,即当出现 HIGH 级别以上的 pending_task 任务时,备份和创建索引等低级别任务将延迟执行。

pending_task 过多会给 master 节点造成压力,大集群(大数据量、高并发)情况下,容易造成节点被踢出集群,甚至集群不响应的情况。pending_task 积压的场景一般出现在大集群中,由于 task 不能快速处理完,会长时间处于积压状态,且会越积压越多,所以异常阈值可以设置一个较大值,根据经验值,设置 pending_task 数量大于 100 为异常;

GET _cat/pending_tasks

节点最大 cpu_util 与最小 cpu_util 之差(极差)

cpu_util 是判断 Elasticsearch 集群健康程度的重要指标,直接影响集群的吞吐量、请求的响应时间。一般情况下集群各个节点 cpu_util 普遍过高的场景比较好处理,增加配置即可让集群恢复健康状态。

而单节点/部分节点出现 cpu_util 过高的情况则不容易定位,如 shard 分配不均匀、routing 设置不合理、宿主机异常都有可能引发该现象,且可能在程序上线一段时间后才出现,这时巡检结果对预警与分析有重大意义。

GET _cat/nodes?h=ip,cpu

为了能够有效起到预警作用,cpu_util 极差的阈值不宜过大,同时应该考虑到角色分离、冷热分离等场景,将极差的计算范围限制到同角色节点之间。

data节点最大 qps 与最小 qps 之差(极差)

节点 qps 指标受到 shard 分配、routing 的影响,极差过大代表集群数据分配不均,或有参数干预,集群负载不均衡。

为该指标设置阈值时,同样需要考虑到角色分离场景,同时由于业务的不同,不同集群 qps 相差巨大,可以通过换算成瞬时流量的百分比来做极差计算。

GET /_nodes/stats/indices,ingest/search

一次 bulk 请求的数量

bulk 请求适用于大写入场景,由于减少了大量的连接,bulk 的效率远高于单条 index。一次 bulk 请求涉及到的 doc 数量对性能有较大影响,过小容易造成线程池堆积,过大容易造成超时。

该指标的阈值设置,仅针对写入/更新频率较大的集群,由于集群配置的差异,可给每个配置区间内设置一个最小值作为巡检的阈值。

node 层面指标分析

uptime

集群节点重启往往是对集群性能有着较大影响,接收到的流量会异常,同时减少了一部分吞吐量,对于大集群而言,重建缓存、shard allocation 会对集群的响应造成影响。集群重启的原因以及重启带来的影响对于集群都是一个隐患。

GET _cat/nodes?h=ip,name,node.role,master,uptime

如果启动时间距离当前时间 1h 以内,则节点发生重启,提示用户检查重启原因、评估带来的影响。

free_disk

磁盘剩余空间最直观的就是影响数据写入,到达水位线(默认95%)后会进入 read only 状态,其次磁盘空间还会带来其他隐患,比如无法操作 forcemerge、甚至 deleteByQuery 也无法完成等。

GET _cat/allocation

出于成本与利用率的考虑,预期状态磁盘占比不应过低,一般 50% 以内为安全值,由于不同集群的特性可能会有差异,可以设置 70% 作为异常阈值。

cpu_util

CPU 对集群性能影响极大,高 CPU 的场景下,响应时间增加,可能出现大量超时(读写异常)。所以需要平衡 cpu 利用率(一般通过超分、调整配置两种方式)。

GET _cat/nodes?h=ip,name,cpu

由于业务流量存在周期性波峰波谷,所以阈值需要在安全范围内尽量设置大一点,例如设置阈值为"cpu_util = 90%"。

node上shard数

节点未分配 shard,则需要确认资源容量分配是否合理,该指标主要针对分片设置不合理导致的资源浪费。一般发生在分片数量设置不合理、迁移过程(存在 exclude)的场景下。故阈值可以设置为 "count(shard) = 0"。

GET _cat/allocation
shard层面指标分析

number

过多或过少的 shard 在一定场景下都会影响查询和写入性能。官方建议的合理的设置数量:每GB 的 heap 不超过 20个 shards; 比如 20GB heap,400个 shards, 30GB heap,不超过 600个shards。Elasticsearch 7.0 版本开始,集群中每个节点默认限制 1000个shard。

阈值按照官方建议值设定即可。

GET _cat/shards?h=index,shard,prirep,state,docs,store,ip,node

size of per shard

大量的小 shard 会影响写入和查询性能,且在同数据量情况下占用更多的内存和磁盘。单 shard 过大则有更多的弊端,例如查询耗时变长、不易于恢复、迁移、容易造成集群压力不均衡等。

通常单个 shard 的大小建议在 10GB - 65GB 之间 (经验值参考:搜索类控制在 20GB,日志类控制在 50BG),查询 API 同上。

官方建议shard size、count值参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/size-your-shards.htmlIndex 层面指标分析

replica

所有集群的 index 都应该有副本分片,没有副本分片的 index 在节点 crash 时会丢失数据。

GET index_name/_settings

当 number_of_replicas 为 0 时候异常情况。

动态 mapping

dynamic mapping 设置为 true 会使得 mapping 变得不可维护,且 mapping 源数据由 master 维护、分发,大量变更可能导致 master 压力过大,在高峰情况下,可能会使得积压大量task,引发集群不响应、踢出节点等问题。

GET /*/_mapping?format=json

巡检需要检查出 "dynamic=true(或默认)"的索引的集群,标记为异常。

refresh_interval

索引 refresh 频率是影响性能的一个因素,受到 refresh_interval 参数与 buffer 大小的影响,由于业务场景的差异,对 refresh 的设置可能大不相同,可将集群类型大致分为搜索类型与数据分析类型,根据类型的不同设置差异化的阈值,且集群不应该出现"refresh_interval = -1"的设置。

GET /*/_settings?include_defaults=true

indices.refresh.total

refresh 的频率影响着 segment 的生成速度与大小,而 segment 过多往往影响查询性能,并且需要消耗更多的内存和磁盘空间。由于默认值为"refresh_interval = 1s",不考虑 buffer 的影响可以认为 refresh 频率为 60/min,故巡检阈值可以设置到比默认值稍高,例如:count(refresh) = 80/min。

GET /_nodes/stats/indices,ingest/refresh

max_result_window

max_result_window 为单次请求返回 doc 的最大值,默认为 10000,该默认值的限制可以覆盖到所有正常的业务场景。一般是深度分页、全量查询、job 查询可能导致返回 doc 数大于10000,触发异常,而这些场景可以由 scroll、search after 来完成。故该指标阈值可以设置成该参数默认值。

GET /*/_settings?include_defaults=true
jvm 层面指标分析

jvm heap 使用率

jvm 堆的使用率过高有着 OutOfMemory 的风险,并使得 GC 频率过高,影响请求响应时间。由于使用的 G1 收集器,首次 GC 收集会在预估 GC 时间达到预定值的时候开始触发,则 heap 使用率的稳定值也随着参数设置而产生较大差异。而该参数主要是为了预防 OutOfMemory 异常,所以该指标阈值可以设置一个较大值,例如 "heap 90"。

GET /_nodes/stats/indices,jvm

jdk version 一致性

由于 Elasticsearch 的分布式属性,集群存在多节点,每个节点一个单独的实例,需要保证 jdk 版本一致。

jvm heap segment memory

segment memory 常驻 heap 内存,所以 segment memory 的增长会压缩其他对象的内存空间。segment memory 是每个 segment 倒排词典上层的一个前缀索引,即 FST 结构,该前缀索引会在 segment 不断的累积下逐渐增多。

为了防止其对 heap 内存过多的占用,需要对该值继续检查限制,由于 FST 结构对前缀索引进行大量压缩,正常状态下对 heap 占用较低,巡检阈值也可以设置较低,例如 20% heap_size。

GET /_nodes/stats/indices,ingest/segments

full gc

Elasticsearch 7.x 默认使用的 G1 垃圾收集器,所以一般会是 Young GC 或 Mixed GC,如果mixed GC 无法跟上新对象分配内存的速度,导致老年代填满无法继续进行 Mixed GC,于是使用 full GC 来收集整个 heap。G1 不提供 full GC,使用的是 serial old GC。所以该 full GC 是单线程串行的,且 stop the world,这对业务来说是致命的。所以该巡检的阈值为"count(full gc) 0"。

GET /_nodes/stats/indices,jvm
threadpool 层面指标分析

bulk reject 数量

bulk 出现 reject 意味着线程池中线程被完全占用,且队列也已经占满。该指标阈值可设置为 "count(bulk rejected) 0"。

GET _cat/thread_pool

search reject 数量

search 出现 reject 意味着线程池中线程被完全占用,且队列也已经占满。该指标阈值可设置为 "count(search rejected) 0",查询 API 同上。

结语

本章详细介绍了智能巡检系统的结构与指标选取、阈值确认,并给出 cluster status 指标采集分析的完整示例代码。有兴趣的读者可以将智能巡检系统通过 python 脚本或者 operator 的方式实现。由于 PAAS 系统下管理的大量差异巨大的 Elasticsearch 集群,巡检系统实现主要难点与重点,是抽象出合理的指标以及精细化实现。

上述介绍中可以看到部分指标项是固定场景下必现,且解决方案唯一且简单,例如 shard_limit场景,一般通过 API 调整 total_shards_per_node 参数即可恢复,因此这些场景可以设计成自动化修复,达到简单的“自愈”,解放开发维护人员的生产力。

示例代码
# -*- coding:utf-8 -*-
analyse cluster status exception,
匹配常用的case, 返回explain完整结果
import espaas_api
import argparse
import traceback
import json
import logging
import gevent
import time
import datetime
import requests
from libs.log import initlog
from gevent import monkey
monkey.patch_all()
timeout = 10
# 7.10 elasticsearch 有16种决策器
dict = {
 'same_shard': '默认配置下一个节点不允许分配同一shard的多个副本分片', # 不参与统一处理
 'shards_limit': '节点限制单索引shard数,调整相应索引的total_shards_per_node参数',
 'disk_threshold': '磁盘空间达到水位线无法分配shard,调整磁盘容量、节点数或者清理磁盘',
 'max_retry': '达到allocate最大重试次数,尝试调用API手动retry',
 'awareness': '副本分配过多,大于awareness的配置',
 'cluster_rebalance': '集群正在Rebalance,可忽略',
 'concurrent_rebalance': '集群当前正在Rebalance,可忽略',
 'node_version': '分配到的节点版本不一致,请联系管理员调整',
 'replica_after_primary_active': '主分片未active,等待并观察主分片',
 'filter': '未通过filter,请检查配置',
 'enable': '集群enable限制',
 'throttling': '集群正在恢复,需要按照优先级恢复 primary replica',
 'rebalance_only_when_active': 'Only allow rebalancing when all shards are active within the shard replication group',
 'resize': 'An allocation decider that ensures we allocate the shards of a target index for resize operations next to the source primaries',
 'restore_in_progress': 'This allocation decider prevents shards that have failed to be restored from a snapshot to be allocated',
 'snapshot_in_progress': 'This allocation decider prevents shards that are currently been snapshotted to be moved to other nodes'
def check_exception_status_reason_advice(cluster):
 # 省略结果数据入库代码
 res = {
 "status": "",
 "reason": "",
 "advice": "",
 "detail": ""
 # 1.health API
 # 2.node count check
 url = gen_url(cluster) + "/_cat/health?format=json"
 r = requests.get(url, auth=(cluster["username"], cluster["password"]), timeout=timeout)
 if r.status_code == 200:
 try:
 content = json.loads(r.content)
 cluster_status_res = content[0]
 res["status"] = cluster_status_res["status"]
 if cluster_status_res["status"] == "green":
 return res
 if cluster_status_res["node.total"] != cluster["nodes_count"]:
 res["reason"] = "node_left"
 res["advice"] = "请检查k8s环境中node数, 使用`kubectl describe po pod_name`检查crash reason"
 return res
 except:
 traceback.print_exc()
 else:
 res["status"] = "unknown"
 return res
 # 3.get UNASSIGNED shard
 get_all_shard_url = gen_url(cluster) + "_cat/shards?format=json"
 get_all_shard_res = requests.get(get_all_shard_url, auth=(cluster["username"], cluster["password"]),
 timeout=timeout)
 unassigned_shard_list = []
 unassigned_shard_final_list = []
 if get_all_shard_res.status_code == 200:
 try:
 shards = json.loads(get_all_shard_res.content)
 for shard in shards:
 if shard["state"] == "UNASSIGNED":
 unassigned_shard_list.append(shard)
 if len(unassigned_shard_list) 20:
 unassigned_shard_final_list = unassigned_shard_list[0:20]
 else:
 unassigned_shard_final_list = unassigned_shard_list
 except:
 traceback.print_exc()
 # 4.explain
 # 5.经验值提取出常用建议
 decider_list = []
 explanation_list = []
 has_advice = 0
 explain_res_all = []
 for shard_tmp in unassigned_shard_final_list:
 index_name_tmp = shard_tmp["index"]
 shard_pos = shard_tmp["shard"]
 shard_type = shard_tmp["prirep"]
 is_primary = False
 if shard_type == "p":
 is_primary = True
 explain_url = gen_url(cluster) + "/_cluster/allocation/explain"
 payload = {
 "index": index_name_tmp,
 "shard": shard_pos,
 "primary": is_primary
 explain_res = requests.post(explain_url, json.dumps(payload),
 auth=(cluster["username"], cluster["password"]),
 headers={"Content-Type": "application/json"}).json()
 explain_res_all.append(explain_res)
 # 整理单个分片所有的decider信息
 same_shard_count = 0
 for decisions_tmp in explain_res["node_allocation_decisions"]:
 for decider_tmp in decisions_tmp['deciders']:
 decider_list.append(decider_tmp['decider'])
 explanation_list.append(decider_tmp['explanation'])
 if decider_tmp['decider'] == "same_shard":
 same_shard_count = same_shard_count + 1
 tmp_string = ''
 explanation_list_string = tmp_string.join(explanation_list)
 if explanation_list_string.find("ik_max_word") != -1:
 res["reason"] = "max_retry"
 res["advice"] = "可能触发6.8.5Ik分词器bug,请使用其他兼容版本或根据issue fix"
 has_advice = 1
 break
 if explanation_list_string.find("cluster.routing.allocation.enable=primaries") == -1 \
 and explanation_list_string.find("cluster.routing.allocation.enable=replicas") == -1:
 print '正常'
 else:
 res["reason"] = "enable"
 res["advice"] = "cancel cluster.routing.allocation.enable=replicas/primaries settings"
 has_advice = 1
 break
 if 'same_shard' in decider_list and same_shard_count == cluster["nodes_count"]:
 res["reason"] = "same_shard"
 res["advice"] = "shard副本设置过多,调整副本数量"
 has_advice = 1
 break
 # 取decider与ES决策器keys的交集
 decider_and = list(set(decider_list).intersection(set(dict.keys())))
 reason_and = ''
 advice_and = ''
 if list(set(decider_list).intersection(set(dict.keys()))):
 for x in decider_and:
 if x != 'same_shard':
 reason_and += x + ';'
 advice_and += dict[x] + ';'
 res["reason"] = reason_and
 res["advice"] = advice_and
 if has_advice == 0:
 res["detail"] = explain_res_all
 return res
def gen_url(cluster):
 url = 'http://' + cluster["domain"] + ':' + str(cluster["http_port"]) + cluster["http_path"]
 if url.endswith("/"):
 url = url[:-1]
 return url
def main(cluster_id=None):
 try:
 if cluster_id:
 clusters = espaas_api.get("cluster", "/api/get_cluster_by?cluster_id=%s" % cluster_id)["data"]
 else:
 clusters = espaas_api.get("cluster", "/api/all_status_exception_cluster")["data"]
 jobs = []
 for cluster in clusters:
 try:
 jobs.append(gevent.spawn(check_exception_status_reason_advice, cluster))
 except:
 traceback.print_exc()
 logging.info("send job number: %s" % len(jobs))
 gevent.joinall(jobs)
 except:
 traceback.print_exc()
if __name__ == '__main__':
 parser = argparse.ArgumentParser()
 parser.add_argument("-l", default="-", help="log file")
 parser.add_argument("--level", default="info")
 args = parser.parse_args()
 initlog(level=args.level, log=args.l)
 main()

本文转自网络,原文链接:https://developer.aliyun.com/article/784163
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文


随机推荐