## Monitoring
The monitoring utils enable metrics to be surfaced from the kafka applications
so the Prometheus server can scrape them.
The Prometheus server can't dynamically figure out pod IPs and scrape the
services directly, so we're using a metrics cache instead.
The metrics cache is a StatefulSet with 2 services assigned to it.
One service is a normal service, with a unique cluster IP address.
The prometheus server scrapes this service endpoint.
The other service doesn't have a cluster IP,
which means that the monitoring utility can find the IP addresses of each
of the backing pods, and send metrics to all of the pods.
This setup gives us high-availability guarantees.
The Monitoring utils are organized into two classes, `MetricsPusher` and `MetricsManager`.
The `MetricsManager` is a container for all of the metrics for the app,
and contains convenience methods for the 3 standardized metrics.
These metrics are
- `messages_consumed`: The number of messages consumed by the app
- `messages_produced`: The number of messages produced by the app
- `messages_errors`: The number of exceptions caught in the app (labeled by type)
The `MetricsPusher` handles pushing the applications metrics to the metrics cache.
It determines the list of IP addresses for all of the metrics cache pods,
and sends the current metrics values for all of the metrics.
### Monitoring Setup Examples
The initialization and update loop for application monitoring will differ
from application to application based on their architecture.
The following examples should cover the standard designs we use.
#### Default Kafka Client Application
A Kafka application that directly relies on interacting with Producer or
Consumer clients should have it's monitoring classes set up and its
pushing thread started in the main run function and passed to the loop, as follows:
```python
import os
from confluent_kafka import Consumer, Producer
from nubium_utils.metrics import MetricsManager, MetricsPusher, start_pushing_metrics
def run_function():
consumer = Consumer()
producer = Producer()
metrics_pusher = MetricsPusher(
os.environ['HOSTNAME'],
os.environ['METRICS_SERVICE_NAME'],
os.environ['METRICS_SERVICE_PORT'],
os.environ['METRICS_POD_PORT'])
metrics_manager = MetricsManager(job=os.environ['HOSTNAME'], app=os.environ['APP_NAME'], metrics_pusher=metrics_pusher)
start_pushing_metrics(metrics_manager, int(os.environ['METRICS_PUSH_RATE']))
try:
while True:
loop_function(consumer, producer, metrics_manager=metrics_manager)
finally:
consumer.close()
```
The `consume_message()` function from this library expects a metrics_manager object
as an argument, so that it can increment the `messages_consumed` metric.
The application itself needs to increment the `messages_produced` metric
needs to be incremented as necessary by the application itself
whenever a Kafka message is produced. The convenience method on the metrics_manager
`inc_messages_produced()` makes this easier,
since it automatically adds the necessary labels to the metric.
The application also needs to be set to increment the `message_errors` metric
whenever an exception is caught.
An example loop function might look like this:
```python
import os
import logging
from nubium_utils import consume_message
from nubium_utils.custom_exceptions import NoMessageError
def loop_function(consumer, producer, metrics_manager):
try:
message = consume_message(consumer, int(os.environ['POLL_TIMEOUT']), metrics_manager)
outgoing_key = message.value()['email_address']
producer.produce(topic='outgoing_topic',key=outgoing_key,value=message.value())
metrics_manager.inc_messages_produced(1)
except NoMessageError:
pass
except KeyError as error:
metrics_manager.inc_message_errors(error)
logging.debug('Message missing email address')
```
#### Flask Kafka Application
The setup becomes a little bit different with a Flask application.
The metrics_manager should be accessible through the app's configuration,
so that it can be accessed in route functions.
The preferred method for error monitoring is to hook into the built in
flask error handling loop, using the `@app.errorhandler` decorator.
Here is an example `create_app()` function
```python
import flask
from werkzeug.exceptions import HTTPException
from .forms_producer_app import forms_producer
from .util_blueprint import app_util_bp
def create_app(config):
"""
Creates app from config and needed blueprints
:param config: (Config) object used to configure the flask app
:return: (flask.App) the application object
"""
app = flask.Flask(__name__)
app.config.from_object(config)
app.register_blueprint(forms_producer)
app.register_blueprint(app_util_bp)
@app.errorhandler(HTTPException)
def handle_exception(e):
"""
Increment error gauge on metrics_manager before returning error message
"""
response = e.get_response()
response.data = f'{e.code}:{e.name} - {e.description}'
app.config['MONITOR'].inc_message_errors(e)
return response
@app.errorhandler(Exception)
def unhandled_exception(error):
app.logger.error(f'Unhandled exception: {error}')
app.config['MONITOR'].inc_message_errors(error)
return f'Unhandled exception: {error}', 500
return app
```
The route functions for produced messages should increase the `messages_produced`
metric when necessary.
Example:
```python
@forms_producer.route('/', methods=["POST"])
@AUTH.login_required
def handle_form():
"""
Ingests a dynamic form from Eloqua and produces it to the topic
"""
values = request.json
string_values = {key: str(value) for key, value in values.items()}
LOGGER.debug(f'Processing form: {values}')
current_app.config['PRODUCER'].produce(
topic=current_app.config['TOPIC'],
key=values['C_EmailAddress'],
value={'form_data': string_values},
on_delivery=produce_message_callback
)
current_app.config['MONITOR'].inc_messages_produced(1)
return jsonify(success=True)
```
#### Faust Streams Application
Monitoring for the Faust streams application is much simpler.
The `FaustAppWrapper` base class in this library has builtin metrics_manager integration.
It sets metrics to the values of Faust's internally tracked metrics,
and increments the exception metric using Faust's error handling.
The standalone metric pushing thread isn't needed here;
instead, the `app.timer` decorator is used to define a pushing thread
that runs asynchronously as part of the Faust App
Simply import the `FaustAppWrapper` and define an app wrapper class from it
with your app's agents.
The metrics_manager and metrics pusher need to be initialized in the run file,
like this example:
```python
import os
from nubium_utils import get_ssl_context, MetricsManager, MetricsPusher
from nubium_utils.faust_utils import get_config
from duplicates_filter.duplicates_filter_app import FaustApp
from utilities.avro_utils import get_avro_client
ssl_context = get_ssl_context()
app_config = get_config()
metrics_pusher = MetricsPusher(job=os.environ['HOSTNAME'], metrics_service_name=os.environ['METRICS_SERVICE_NAME'], metrics_service_port=os.environ['METRICS_SERVICE_PORT'], metrics_pod_port=os.environ['METRICS_POD_PORT'])
metrics_manager = MetricsManager(job=metrics_pusher.job, app=app_config['id'], metrics_pusher=metrics_pusher)
duplicate_filter = FaustApp(
app_config,
avro_client=get_avro_client(),
metrics_manager=metrics_manager,
metrics_pusher=metrics_pusher
)
duplicate_filter.app.main()
```
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
资源分类:Python库 所属语言:Python 资源全名:nubium-utils-0.1.1.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源推荐
资源详情
资源评论
收起资源包目录
nubium-utils-0.1.1.tar.gz (29个子文件)
nubium-utils-0.1.1
PKG-INFO 10KB
nubium_utils.egg-info
PKG-INFO 10KB
requires.txt 27B
SOURCES.txt 933B
top_level.txt 19B
dependency_links.txt 1B
tests
test_faust_utils.py 2KB
test_monitoring.py 5KB
__init__.py 0B
nubium_utils
metrics
metrics_manager.py 2KB
metrics_pusher.py 2KB
__init__.py 144B
metrics_pushing_thread.py 1KB
general_utils.py 63B
logging_utils.py 617B
confluent_utils
producer_utils.py 4KB
confluent_configs.py 3KB
__init__.py 279B
consumer_utils.py 3KB
message_utils.py 953B
__init__.py 584B
faust_utils
instrumented_app.py 328B
app_wrapper.py 3KB
helpers.py 2KB
__init__.py 136B
custom_exceptions.py 583B
setup.cfg 38B
setup.py 765B
README.md 8KB
共 29 条
- 1
资源评论
挣扎的蓝藻
- 粉丝: 14w+
- 资源: 15万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 利用网页设计语言制作的一款简易的时钟网页,可供初学者借鉴,学习 语言:html+css+script
- 学习threejs,通过设置纹理属性来修改纹理贴图的位置和大小,贴图
- _root_license_license_8e0ac649-0626-408f-881c-6603da48ce72.lrf
- 基于 SpringBoot 的 JavaWeb 宠物猫认养系统:功能设计与领养体验优化
- CAN Get Value String
- CAN Get Value Integer
- CAN Get Value Handle
- 爬取小说资源的Python实践:从单线程到多线程的效率飞跃
- typora旧版mac和win平台,无弹框
- shell项目实训二中的条件控制
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功