未经本人同意不得转载,若引用请附上原文链接。
项目使用flink来处理kafka中的无界流数据,采用的是flink on yarn的模式部署flink任务。最近做flink任务的监控过程中,踩了一些坑。下面是过程,只想看最终方案的直接拉到最后。
先说一下整体流程,flink官方支持通过配置,将运行过程中的jobManager,taskManager的各项指标推送给pushgateway(后面简称"pgw"),然后prometheus每隔30s从pgw主动拉取数据并存储;最后,grafana以prometheus作为数据源绘制各种指标的图标,同时alertmanager上配置了一些指标计算的表达式,当触发时会产生告警。
现在把重点放在 flink => pgw=> prometheus 这条链路上来
假设有2个flink job,因为是per job的模式,所以会在yarn上起两个flink 集群,每个集群都有1个jobManager和若干个taskManager(假设是2个)。最初的设计如下图
在测试场景中,每个作业只有少量的jobManager和taskManager,一直这样使用没问题。但是当部署到预生产环境时,需要调整作业的并行度,这时每个作业有1个jobManagerh和33个taskManager,作业也有将近10个。运行的过程中,发现pgw总是崩溃,导致指标数据中断。但是pgw本身也不支持调节运行各项参数,也找不到运行过程的打印的日志。怀疑其本身是个内存模型,推送的数据源过多导致数据量太大,内存溢出崩溃。(这里不得不吐槽pgw,挺不好用的)。同时也面临着单点问题(假使pgw能承载无限大的数据,但部署的节点有可能会宕机,导致指标都无法推送出去)。
于是从网上查询相关的解决方案,发现github上有个开源项目GitHub - ning1875/dynamic-sharding: 用动态分片解决pushgateway高可用 单点 HA问题,有一些解决思路。但对于我们来说不适用(公司对开源项目有比较严格的限制,且为此引入一整个分布式中间件consul有些矫枉过正的意思),但其提供的想法给了一些灵感。
最初的想法是使用“多点部署+nginx路由”的方式来解决HA的问题。在每个yarn的节点上全都部署一个pgw,然后将他们的ip+port作为上游资源池。flink侧将nginx节点的某个端口作为pgw的输入点,然后由nginx使用轮询的方式将请求依次发往不同的pgw,这样数据是均匀分布的,不会造成某个节点数据过多程序崩溃。然后prometheus从所有的pgw节点拉取数据,聚合在一起形成完整的数据。这样即使某个pgw节点挂了,但nginx有探活机制,它会将请求发往其他的pgw节点上。当宕机的pgw节点恢复后,nginx也会自动把请求分发给它。“完美”实现高可用。
然后新的问题就产生了,发现grafana上的图形针对同一个作业会生成多个图形。例如JobManager1的指标flink_jobmanager_job_uptime(上报flink作业运行时间的一项指标)会有多个图形。究其原因,正是因为nginx采用轮询的方式发送数据,导致JobManager1的指标数据可能在pgw1上有一部分,也可能在pgw2、pgw3上有一部分。
但prometheus抓取数据时,它在指标里增加了一项instance属性用于区分是从哪个pgw拿到的数据,导致本应该合并的数据没有合并,在promethues上用PromQL查询就会有多条数据(从而grafana上对于同一个指标会有多个图形)。但其实我们是不希望它区分数据从哪个pgw过来的,即对于同一个组件的同一个指标只希望有一条数据。那就需要把instance属性去除或掩盖。通过调研发现,在prometheus.yml里添加字段转换的配置可以实现
- job_name: "flink_monitor"scrape_interval: 30sstatic_configs:- targets: ['xxx.xxx.xxx.xxx:9091']metric_relabel_configs:- source_labels: [instance]target_laebl: 'instance'replacement: ''action: replace
相当于将instance的数值替换为空,这样就解决了多条数据的问题。
但是新的问题又产生了,发现grafana图形虽然只有一个了,但图形不正确。拿上述指标flink_jobmanager_job_uptime为例,它应该是个单调递增的直线,但实际显示它是乱七八糟的线,有上升也有下降,总运行时间怎么可能会下降呢,这不符合常识。
然后通过PromQL查询jobManager1的指标flink_jobmanager_job_uptime,是一条数据,但是,不停的点击查询发现,其数值不是递增的,有时候会变小。
又经过一番研究,才知道虽然instance被抹除了,但实际上prometheus还是从3个pwg节点上拿到了三份数据,表面上显示为一个指标项,但结果有三个。每次查询的时候随机返回一个。
这是jobManager推送指标数据到pgw的时间线,由于nginx的轮询机制,假设30s、60s、90s的指标发往了不同的pgw。而在prometheus的时间线上,就大有文章了。
由于prometheus是同时从多个pgw获取数据的,第30s时,由于只有pgw1有数据,拿到了指标值30,没问题。第60s时,pgw1和pgw2都有数据,同时拿到后prometheus将两个数据都存了下来,且pgw1返回的还是老数据,此时通过PromQL查询该指标时,你会发现值是在30和60两个值随机返回一个。到了90s时,pgw1、pgw2、pgw3都有了数据,而除了pgw3之外都是老数据,查询时也是随机挑一个返回。我晕~这样就能解释为啥grafana图形是乱的。
这pushgateway也太拉跨了,问你要数据你就给啊,也不看看你那都啥时候的数据了。
行,以解决问题为优先。如果,能把同一个组件(jobManager或taskManager)的数据发往同一个pgw,这样就不会产生多份数据的问题了,某个pgw只会独立的拥有某个jobManager的最新指标数据。我们知道nginx是有通过ip_hash,url_hash的能力,可能让同样的请求分发到同一个目标机器。
那就先研究一下,组件(jobManager或taskManager)上报指标时,它的请求是什么样的呢。通过nginx配置access_log后,打印出Request本身。以下是nginx.conf的部分配置(完整配置往后看),其中$request可以打印出请求url
# 定义日志打印格式
log_format main '$remote_addr - "$request" ''$status $upstream_addr';
通过转发的日志access_log发现,flink会调用以下API推送指标数据
PUT /metrics/job/flink-metrics112ljlkna02k1l29j210nkns HTTP/1.1
而中间这串flink-metrics112ljlkna02k1l29j210nkns,就是pgw的Group Key的概念(可以在pgw的前端页面上看到),每个独立的jobManager或taskManager都会拥有自己独一无二的Group Key。那答案就呼之欲出了,不需要解析报文体,请求的uri的唯一变化值就是Group Key,那只通过uri就能区分是哪个组件了。这不是完美契合nginx的uri_hash概念嘛,这样既解决了单点问题,又解决了数据混乱的问题。
说干就干,在nginx.conf上补充得到完整的配置。如下:
worker_processes 1;
events{worker_connections 1024;
}http {# 开启gzip压缩gzip on;# 下面这两项需要加,不加flink推送的指标数据过大,nginx转发会报错413# 配置请求体缓存区大小client_max_body_size 10m;# 配置客户端请求体最大值client_body_buffer_size 10m;# 配置上游服务器资源池upstream pushgateway_servers {# 配置按照uri进行hash,需要引入对应模块hash $request_uri# 配置pgw的实例ipserver xxx.xxx.xxx.xxx:9091;server xxx.xxx.xxx.xxx:9091;}# 定义日志打印格式log_format main '$remote_addr - "$request" ''$status $upstream_addr';server {listen 9099;server_name localhost;location / {proxy_pass http://pushgateway_servers; #使用上面的资源池proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# 使用上面的日志定义access_log logs/access.log main;}}
}
配置完成后重启,发现一切“美好”~ 我们来分析一下现在的流程。
每个组件调用nginx时uri里已经表明了身份,通过url_hash,只会去往固定的pgw。也就是全量的组件数据会均匀(假设hash的均匀)分布在所有的pgw上,每个pgw拥有的数据都是独一无二且最新的,prometheusc从所有pgw获取数据拼凑成完整数据。
如果有pgw节点掉线(假设pgw1),nginx对于同一个uri的请求后续也会发往某个具体的pgw新节点(假设pgw2),不会发送给多个节点。同时因为pgw节点掉线,prometheus也无法从对应的pgw获取(老)数据,则不会产生多个数据值的问题。最多会因为抖动过程中,prometheus获取不到数据,在grafana上展示一个空点,但这是可以接受的。
那现在完美了吗?我又要说但是了,但是当pgw1节点恢复后,nginx由于自带的探活机制,又会重新把JM1的指标上送给pgw1,现在又出现pgw1和pgw2同时拥有JM1的数据,且pgw2是老数据。
现在的问题又到了,pgw1恢复时pgw2关于JM2的数据如何删除呢。dynamic-sharding里提到的,当pgw节点宕机需要从consul上剔除该节点,即使后面恢复也不使用该节点,这一点在consul里可以做到,但在nginx里无法控制啊。如果想要节点恢复使用,就等同于在扩容节点,需要重启所有pgw,以清除所有pgw内存中保留的数据。等下,重启pgw就可以清除旧数据,那我在pgw1节点恢复时,触发机制去重启pgw2不就相当于删除了JM2的旧数据。
事实上,这样并不好做。首先nginx根本没有提供相应的配置能让你在节点切走和切回时插入你想要的操作,可以通过lua脚本切入整个转发流程,但学习和使用成本较高。
在pgw的前端页面上,右上角有按钮“Delete All Groups”是可以删除当前pgw的所有数据的,通过这种方式可以实现不重启进程也能完成数据的删除,即通过API请求发到对应的pgw就可以实现清除其内存中的数据,而且不需要身份认证。
PUT /api/v1/admin/wipe
但由于无法得知pgw1挂了后,对应的组件数据被nginx重新分配到哪个节点中。目前采用的折衷处理方式是,在pgw的服务启动脚步里增加一个广播API请求到所有其他pgw节点上,以删除它们内存中的数据。这虽然会使最终prometheus中有些指标会缺失一两个数据点,但总比错误数据效果好一些吧。如果你有其他更好的方案,也欢迎来讨论。
至此,也就基本完成了pgw的HA。总结一下,通过部署多个pgw节点,前置nginx做负载均衡,通过uri_hash确保数据独立且均匀分布在所有pgw节点上。然后通过prometheus连接所有的pgw节点读取数据聚合形成flink集群完整数据。在pgw节点宕机时,数据会漂移至其他pgw节点继续上报。但当pgw节点恢复时,通过启动脚步广播API的方式清除其他所有pgw节点的内存数据。