python应用暴露自定义指标到prometheus的核心是使用prometheus-client库,1. 安装库:pip install prometheus_client;2. 定义指标类型:包括counter(计数器,仅增)、gauge(计量器,可增减)、summary(摘要,客户端计算分位数)和histogram(直方图,服务端计算分位数);3. 使用标签(labels)增加维度,但需避免高基数标签(如用户id、请求id)以防止性能问题;4. 通过start_http_server(端口)启动http服务,在/metrics路径暴露指标;5. 对于短生命周期任务,使用pushgateway推送指标而非拉取模式;6. 遵循命名规范(如snake_case、_total结尾、含单位)并提供清晰描述。此外,可构建自定义exporter监控第三方系统,或结合web框架中间件实现自动化指标采集,所有方案均基于prometheus-client实现。

在Python中操作Prometheus,最直接也最常用的方式就是利用官方提供的
prometheus-client
要开始用Python暴露Prometheus指标,首先你需要安装
prometheus-client
pip install prometheus_client
然后,在你的Python应用中,你可以定义不同类型的指标,并更新它们的值。一个典型的设置会是这样:
立即学习“Python免费学习笔记(深入)”;
from prometheus_client import start_http_server, Counter, Gauge, Summary, Histogram
import random
import time
# 定义一个计数器,用于统计请求总数
REQUEST_COUNT = Counter('my_app_requests_total', 'Total number of requests to my application.')
# 定义一个计量器,用于实时显示某个值(比如当前活跃用户数)
ACTIVE_USERS = Gauge('my_app_active_users', 'Current number of active users.')
# 定义一个摘要,用于记录请求处理时间,提供分位数信息
REQUEST_LATENCY_SECONDS = Summary('my_app_request_latency_seconds', 'Request latency in seconds.')
# 定义一个直方图,也用于记录请求处理时间,但提供可配置的桶
REQUEST_DURATION_HISTOGRAM = Histogram('my_app_request_duration_seconds', 'Request duration in seconds histogram.')
def process_request():
"""模拟一个处理请求的函数"""
REQUEST_COUNT.inc() # 每次调用,计数器加1
# 随机模拟处理时间
processing_time = random.uniform(0.01, 0.5)
time.sleep(processing_time)
# 更新摘要和直方图
REQUEST_LATENCY_SECONDS.observe(processing_time)
REQUEST_DURATION_HISTOGRAM.observe(processing_time)
# 模拟活跃用户数变化
if random.random() < 0.5:
ACTIVE_USERS.inc()
else:
ACTIVE_USERS.dec()
if __name__ == '__main__':
# 启动一个HTTP服务器,在端口8000暴露指标
# 注意:这通常在一个单独的线程中运行,不会阻塞主应用逻辑
start_http_server(8000)
print("Prometheus metrics server started on port 8000")
# 模拟主应用逻辑持续运行
while True:
process_request()
time.sleep(0.1) # 模拟请求间隔这段代码展示了如何初始化不同类型的指标,并在一个模拟的业务逻辑中更新它们。
start_http_server(8000)
http://localhost:8000/metrics
当我们在谈论Python应用如何暴露自定义指标时,核心其实就是利用
prometheus-client
Counter
Gauge
Summary
Histogram
Counter
.inc()
.inc(amount)
Counter
Gauge
.inc()
.dec()
.set(value)
Summary
Histogram
Summary
Histogram
Summary
Histogram
histogram_quantile
Histogram
在实际操作中,指标的命名规范也非常重要。Prometheus社区有一些约定俗成的规则,比如使用
snake_case
_total
_seconds
_bytes
from prometheus_client import Counter, Gauge, Summary, Histogram, generate_latest
from http.server import BaseHTTPRequestHandler, HTTPServer
import time
import random
# 带标签的计数器,可以按不同的HTTP方法统计
HTTP_REQUESTS_TOTAL = Counter(
'http_requests_total',
'Total HTTP requests received',
['method', 'endpoint'] # 定义标签
)
# 计量器,用于表示一个瞬时值,例如缓存命中率
CACHE_HIT_RATIO = Gauge('cache_hit_ratio', 'Current cache hit ratio')
# 直方图,用于记录API响应时间,并自定义桶
API_RESPONSE_TIME_SECONDS = Histogram(
'api_response_time_seconds',
'API response time in seconds',
buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, float('inf')]
)
class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/metrics':
self.send_response(200)
self.send_header('Content-type', 'text/plain; version=0.0.4; charset=utf-8')
self.end_headers()
self.wfile.write(generate_latest()) # 获取所有注册的指标数据
else:
self.send_response(404)
self.end_headers()
def run_mock_api():
"""模拟一个带指标更新的API服务"""
while True:
method = random.choice(['GET', 'POST', 'PUT'])
endpoint = random.choice(['/users', '/products', '/orders'])
# 更新计数器,带标签
HTTP_REQUESTS_TOTAL.labels(method=method, endpoint=endpoint).inc()
# 模拟API处理时间
response_time = random.uniform(0.01, 1.0)
API_RESPONSE_TIME_SECONDS.observe(response_time)
# 随机更新缓存命中率
CACHE_HIT_RATIO.set(random.uniform(0.7, 0.99))
time.sleep(random.uniform(0.1, 0.5))
if __name__ == '__main__':
# 在单独的线程中启动HTTP服务器
from threading import Thread
metrics_server_thread = Thread(target=lambda: HTTPServer(('0.0.0.0', 8000), MetricsHandler).serve_forever())
metrics_server_thread.daemon = True # 设置为守护线程,主程序退出时自动退出
metrics_server_thread.start()
print("Metrics server running on http://localhost:8000/metrics")
# 运行模拟API
run_mock_api()
这段代码展示了如何使用标签(labels)来增加指标的维度,这在Prometheus中非常强大,可以让你根据不同的维度(比如HTTP方法、API端点)来切分和聚合数据。同时,也展示了如何自定义
Histogram
在使用
prometheus-client
首先,最常见的陷阱就是高基数(High Cardinality)问题。如果你在指标的标签中使用了像用户ID、请求ID、会话ID或者时间戳这类非常动态且唯一的值,那么Prometheus在存储和查询这些指标时会面临巨大的压力。每一个独特的标签组合都会被视为一个新的时间序列。想象一下,如果你的应用每秒处理数千个请求,每个请求都有一个唯一的ID作为标签,那Prometheus的存储空间很快就会爆炸,查询也会变得异常缓慢。我记得有一次,我们不小心把一个日志ID加到了某个核心指标的标签里,结果不到半天,Prometheus的磁盘使用率就飙升了。所以,我的建议是:对标签的设计要非常谨慎,只使用那些具有业务意义且基数有限的维度,比如HTTP方法、状态码、API路径(但要避免包含动态参数的路径)、服务版本、部署区域等。
其次,是关于指标更新的频率与性能。虽然
prometheus-client
再来就是短生命周期任务的指标暴露。
prometheus-client
from prometheus_client import push_to_gateway, Counter
import random
import time
# 定义一个计数器
BATCH_JOB_RUNS_TOTAL = Counter('batch_job_runs_total', 'Total runs of the batch job.')
BATCH_JOB_FAILURES_TOTAL = Counter('batch_job_failures_total', 'Total failures of the batch job.')
def run_batch_job():
"""模拟一个批处理任务"""
BATCH_JOB_RUNS_TOTAL.inc()
print("Batch job started...")
# 模拟任务执行时间
time.sleep(random.uniform(1, 5))
if random.random() < 0.2: # 20% 失败率
BATCH_JOB_FAILURES_TOTAL.inc()
print("Batch job failed!")
else:
print("Batch job completed successfully.")
# 任务结束后,将指标推送到Pushgateway
# job参数是必须的,用于标识这个批处理任务
# 可以添加分组键(groupingkey)来进一步区分,比如按实例ID
push_to_gateway('localhost:9091', job='my_python_batch_job', grouping_key={'instance': 'my_server_1'}, registry=None)
print("Metrics pushed to Pushgateway.")
if __name__ == '__main__':
# 假设Pushgateway运行在localhost:9091
run_batch_job()这段代码展示了如何使用
push_to_gateway
registry=None
job
grouping_key
job
最后,一个小的优化点是,确保你的指标描述(docstring)清晰明了。这不仅方便你自己,也方便团队里的其他人理解这些指标的含义。一个好的指标描述能省去很多沟通成本。
当提到Python与Prometheus的集成,
prometheus-client
prometheus-client
首先,是Prometheus Pushgateway,前面已经提到了。它并不是一个替代
prometheus-client
prometheus-client
其次,是自定义Prometheus Exporter。虽然
prometheus-client
prometheus-client
举个例子,假设你想监控一个没有Prometheus接口的遗留系统。你可以写一个Python脚本:
prometheus-client
from prometheus_client import Gauge, start_http_server
import time
import random
# 假设这是从某个遗留系统获取到的数据
LEGACY_SYSTEM_STATUS = Gauge('legacy_system_status', 'Status of the legacy system (1=ok, 0=error)')
LEGACY_SYSTEM_QUEUE_DEPTH = Gauge('legacy_system_queue_depth', 'Current depth of the legacy system queue')
def fetch_legacy_data():
"""模拟从遗留系统获取数据"""
# 实际场景中,这里会是调用API、SSH命令或数据库查询
status = 1 if random.random() > 0.1 else 0 # 10% 几率出错
queue_depth = random.randint(0, 100)
return status, queue_depth
def run_exporter():
while True:
status, queue_depth = fetch_legacy_data()
LEGACY_SYSTEM_STATUS.set(status)
LEGACY_SYSTEM_QUEUE_DEPTH.set(queue_depth)
print(f"Fetched legacy data: Status={status}, QueueDepth={queue_depth}")
time.sleep(10) # 每10秒更新一次
if __name__ == '__main__':
start_http_server(8001) # Exporter在8001端口暴露
print("Legacy system exporter started on port 8001")
run_exporter()这个简单的例子展示了如何构建一个独立的Exporter。它定期从一个模拟的“遗留系统”获取数据,并将其转换为Prometheus指标。
最后,一些高级框架或库的集成。虽然这本质上还是基于
prometheus-client
prometheus-client
prometheus-client
prometheus-client
以上就是Python怎样操作Prometheus?prometheus-client的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号