Merge pull request #1 from didi/dev

merge didi dev
This commit is contained in:
EricZeng
2021-01-23 11:16:35 +08:00
committed by GitHub
13 changed files with 289 additions and 23 deletions

View File

@@ -48,13 +48,16 @@
![dingding_group](./docs/assets/images/common/dingding_group.jpg)
钉钉群ID32821440
## OCE认证
OCE是一个认证机制和交流平台为Logi-KafkaManager生产用户量身打造我们会为OCE企业提供更好的技术支持比如专属的技术沙龙、企业一对一的交流机会、专属的答疑群等如果贵司Logi-KafkaManager上了生产[快来加入吧](http://obsuite.didiyun.com/open/openAuth)
## 项目成员
### 内部核心人员
`iceyuhui``liuyaguang``limengmonty``zhangliangmike``nullhuangyiming``zengqiao``eilenexuzhe``huangjiaweihjw`
`iceyuhui``liuyaguang``limengmonty``zhangliangmike``nullhuangyiming``zengqiao``eilenexuzhe``huangjiaweihjw``zhaoyinrui``marzkonglingxu``joysunchao`
### 外部贡献者

Binary file not shown.

After

Width:  |  Height:  |  Size: 270 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 589 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 652 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 511 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 672 KiB

View File

@@ -0,0 +1,65 @@
---
![kafka-manager-logo](../assets/images/common/logo_name.png)
**一站式`Apache Kafka`集群指标监控与运维管控平台**
---
# 动态配置管理
## 1、Topic定时同步任务
### 1.1、配置的用途
`Logi-KafkaManager`在设计上,所有的资源都是挂在应用(app)下面。 如果接入的Kafka集群已经存在Topic了那么会导致这些Topic不属于任何的应用从而导致很多管理上的不便。
因此需要有一个方式将这些无主的Topic挂到某个应用下面。
这里提供了一个配置会定时自动将集群无主的Topic挂到某个应用下面下面。
### 1.2、相关实现
就是一个定时任务,该任务会定期做同步的工作。具体代码的位置在`com.xiaojukeji.kafka.manager.task.dispatch.op`包下面的`SyncTopic2DB`类。
### 1.3、配置说明
**步骤一:开启该功能**
在application.yml文件中增加如下配置已经有该配置的话直接把false修改为true即可
```yml
# 任务相关的开关
task:
op:
sync-topic-enabled: true # 无主的Topic定期同步到DB中
```
**步骤二:配置管理中指定挂在那个应用下面**
配置的位置:
![sync_topic_to_db](./assets/dynamic_config_manager/sync_topic_to_db.jpg)
配置键:`SYNC_TOPIC_2_DB_CONFIG_KEY`
配置值(JSON数组)
- clusterId需要进行定时同步的集群ID
- defaultAppId该集群无主的Topic将挂在哪个应用下面
- addAuthority是否需要加上权限, 默认是false。因为考虑到这个挂载只是临时的我们不希望用户使用这个App同时后续可能移交给真正的所属的应用因此默认是不加上权限。
**注意这里的集群ID或者是应用ID不存在的话会导致配置不生效。该任务对已经在DB中的Topic不会进行修改**
```json
[
{
"clusterId": 1234567,
"defaultAppId": "ANONYMOUS",
"addAuthority": false
},
{
"clusterId": 7654321,
"defaultAppId": "ANONYMOUS",
"addAuthority": false
}
]
```

View File

@@ -7,7 +7,7 @@
---
# 夜莺监控集成
# 监控系统集成——夜莺
- `Kafka-Manager`通过将 监控的数据 以及 监控的规则 都提交给夜莺,然后依赖夜莺的监控系统从而实现监控告警功能。

View File

@@ -0,0 +1,54 @@
---
![kafka-manager-logo](../assets/images/common/logo_name.png)
**一站式`Apache Kafka`集群指标监控与运维管控平台**
---
# 监控系统集成
- 监控系统默认与 [夜莺] (https://github.com/didi/nightingale) 进行集成;
- 对接自有的监控系统需要进行简单的二次开发,即实现部分监控告警模块的相关接口即可;
- 集成会有两块内容,一个是指标数据上报的集成,还有一个是监控告警规则的集成;
## 1、指标数据上报集成
仅完成这一步的集成之后,即可将监控数据上报到监控系统中,此时已能够在自己的监控系统进行监控告警规则的配置了。
**步骤一:实现指标上报的接口**
- 按照自己内部监控系统的数据格式要求,将数据进行组装成符合自己内部监控系统要求的数据进行上报,具体的可以参考夜莺集成的实现代码。
- 至于会上报哪些指标,可以查看有哪些地方调用了该接口。
![sink_metrics](./assets/monitor_system_integrate_with_self/sink_metrics.jpg)
**步骤二:相关配置修改**
![change_config](./assets/monitor_system_integrate_with_self/change_config.jpg)
**步骤三:开启上报任务**
![open_sink_schedule](./assets/monitor_system_integrate_with_self/open_sink_schedule.jpg)
## 2、监控告警规则集成
完成**1、指标数据上报集成**之后,即可在自己的监控系统进行监控告警规则的配置了。完成该步骤的集成之后,可以在`Logi-KafkaManager`中进行监控告警规则的增删改查等等。
大体上和**1、指标数据上报集成**一致,
**步骤一:实现相关接口**
![integrate_ms](./assets/monitor_system_integrate_with_self/integrate_ms.jpg)
实现完成步骤一之后,接下来的步骤和**1、指标数据上报集成**中的步骤二、步骤三一致,都需要进行相关配置的修改即可。
## 3、总结
简单介绍了一下监控告警的集成,嫌麻烦的同学可以仅做 **1、指标数据上报集成** 这一节的内容即可满足一定场景下的需求。
**集成过程中有任何觉得文档没有说清楚的地方或者建议欢迎入群交流也欢迎贡献代码觉得好也辛苦给个star。**

View File

@@ -53,13 +53,13 @@
- 3、数据库时区问题。
检查MySQL的topic表查看是否有数据如果有数据那么再检查设置的时区是否正确。
检查MySQL的topic_metrics、broker_metrics表,查看是否有数据,如果有数据,那么再检查设置的时区是否正确。
---
### 5、如何对接夜莺的监控告警功能
- 参看 [kafka-manager 对接夜莺监控](../dev_guide/Intergration_n9e_monitor.md) 说明。
- 参看 [kafka-manager 对接夜莺监控](../dev_guide/monitor_system_integrate_with_n9e.md) 说明。
---

View File

@@ -0,0 +1,72 @@
---
![kafka-manager-logo](../assets/images/common/logo_name.png)
**一站式`Apache Kafka`集群指标监控与运维管控平台**
---
# Topic 指标说明
## 1. 实时流量指标说明
| 指标名称| 单位| 指标含义|
|-- |---- |---|
| messagesIn| 条/s | 每秒发送到kafka的消息条数 |
| byteIn| B/s | 每秒发送到kafka的字节数 |
| byteOut| B/s | 每秒流出kafka的字节数所有消费组消费的流量如果是Kafka版本较低这个还包括副本同步的流量 |
| byteRejected| B/s | 每秒被拒绝的字节数 |
| failedFetchRequest| qps | 每秒拉取失败的请求数 |
| failedProduceRequest| qps | 每秒发送失败的请求数 |
| totalProduceRequest| qps | 每秒总共发送的请求数与messagesIn的区别是一个是发送请求里面可能会有多条消息 |
| totalFetchRequest| qps | 每秒总共拉取消息的请求数 |
 
## 2. 历史流量指标说明
| 指标名称| 单位| 指标含义|
|-- |---- |---|
| messagesIn| 条/s | 近一分钟每秒发送到kafka的消息条数 |
| byteIn| B/s | 近一分钟每秒发送到kafka的字节数 |
| byteOut| B/s | 近一分钟每秒流出kafka的字节数所有消费组消费的流量如果是Kafka版本较低副本同步的流量 |
| byteRejected| B/s | 近一分钟每秒被拒绝的字节数 |
| totalProduceRequest| qps | 近一分钟每秒总共发送的请求数与messagesIn的区别是一个是发送请求里面可能会有多条消息 |
 
## 3. 实时耗时指标说明
**基于滴滴加强版Kafka引擎的特性可以获取Broker的实时耗时信息和历史耗时信息**
| 指标名称| 单位 | 指标含义 | 耗时高原因 | 解决方案|
|-- |-- |-- |-- |--|
| RequestQueueTimeMs| ms | 请求队列排队时间 | 请求多,服务端处理不过来 | 联系运维人员处理 |
| LocalTimeMs| ms | Broker本地处理时间 | 服务端读写数据慢,可能是读写锁竞争 | 联系运维人员处理 |
| RemoteTimeMs| ms | 请求等待远程完成时间对于发送请求如果ack=-1该时间表示副本同步时间对于消费请求如果当前没有数据该时间为等待新数据时间如果请求的版本与topic存储的版本不同需要做版本转换也会拉高该时间 | 对于生产ack=-1必然会导致该指标耗时高对于消费如果topic数据写入很慢该指标高也正常。如果需要版本转换该指标耗时也会高 | 对于生产可以考虑修改ack=1消费端问题可以联系运维人员具体分析 |
| ThrottleTimeMs| ms | 请求限流时间 | 生产/消费被限流 | 申请提升限流值 |
| ResponseQueueTimeMs| ms | 响应队列排队时间 | 响应多,服务端处理不过来 | 联系运维人员处理 |
| ResponseSendTimeMs| ms | 响应返回客户端时间 | 1下游消费能力差导致向consumer发送数据时写网络缓冲区过慢2消费lag过大一直从磁盘读取数据 | 1:提升客户端消费性能2: 联系运维人员确认是否读取磁盘问题 |
| TotalTimeMs| ms | 接收到请求到完成总时间,理论上该时间等于上述六项时间之和,但由于各时间都是单独统计,总时间只是约等于上述六部分时间之和 | 上面六项有些耗时高 | 具体针对高的指标解决 |
**备注由于kafka消费端实现方式消费端一次会发送多个Fetch请求在接收到一个Response之后就会开始处理数据使Broker端返回其他Response等待因此ResponseSendTimeMs并不完全是服务端发送时间有时会包含一部分消费端处理数据时间**
## 4. 历史耗时指标说明
**基于滴滴加强版Kafka引擎的特性可以获取Broker的实时耗时信息和历史耗时信息**
| 指标名称| 单位| 指标含义|
|-- | ---- |---|
| produceRequestTime99thPercentile|ms|Topic近一分钟发送99分位耗时|
| fetchRequestTime99thPercentile|ms|Topic近一分钟拉取99分位耗时|
| produceRequestTime95thPercentile|ms|Topic近一分钟发送95分位耗时|
| fetchRequestTime95thPercentile|ms|Topic近一分钟拉取95分位耗时|
| produceRequestTime75thPercentile|ms|Topic近一分钟发送75分位耗时|
| fetchRequestTime75thPercentile|ms|Topic近一分钟拉取75分位耗时|
| produceRequestTime50thPercentile|ms|Topic近一分钟发送50分位耗时|
| fetchRequestTime50thPercentile|ms|Topic近一分钟拉取50分位耗时|

View File

@@ -14,37 +14,119 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.MonitorRuleDO;
import java.util.List;
/**
* 监控系统接口
* @author zengqiao
* @date 20/5/21
*/
public interface MonitorService {
/**
* 创建告警规则
* @param monitorDTO 告警规则
* @param operator 操作人
* @return 操作状态结果
*/
ResultStatus createMonitorRule(MonitorRuleDTO monitorDTO, String operator);
/**
* 删除告警规则
* @param id 告警ID
* @param operator 操作人
* @return 操作状态结果
*/
ResultStatus deleteMonitorRule(Long id, String operator);
/**
* 修改告警规则
* @param monitorDTO 告警规则
* @param operator 操作人
* @return 操作状态结果
*/
ResultStatus modifyMonitorRule(MonitorRuleDTO monitorDTO, String operator);
/**
* 获取告警规则
* @param operator 操作人
* @return 监控告警规则概要信息
*/
List<MonitorRuleSummary> getMonitorRules(String operator);
/**
* 获取监控告警规则的详情信息
* @param monitorRuleDO 本地存储的监控告警规则概要信息
* @return
*/
Result<MonitorRuleDTO> getMonitorRuleDetail(MonitorRuleDO monitorRuleDO);
/**
* 依据主键ID, 获取存储于MySQL中的监控告警规则基本信息
* @param id 本地监控告警规则ID
* @return 本地监控告警规则信息
*/
MonitorRuleDO getById(Long id);
/**
* 依据策略ID, 获取存储于MySQL中的监控告警规则基本信息
* @param strategyId 策略ID
* @return 本地监控告警规则信息
*/
MonitorRuleDO getByStrategyId(Long strategyId);
/**
* 获取告警历史
* @param id 告警ID
* @param startTime 查询的起始时间
* @param endTime 查询的截止时间
* @return 告警历史
*/
Result<List<Alert>> getMonitorAlertHistory(Long id, Long startTime, Long endTime);
/**
* 查询告警详情
* @param alertId 告警ID
* @return 告警详情
*/
Result<MonitorAlertDetail> getMonitorAlertDetail(Long alertId);
/**
* 屏蔽告警
* @param monitorSilenceDTO 屏蔽的信息
* @param operator 操作人
* @return 屏蔽操作的结果
*/
Result createSilence(MonitorSilenceDTO monitorSilenceDTO, String operator);
/**
* 删除屏蔽策略
* @param silenceId 屏蔽ID
* @return 删除屏蔽告警的操作结果
*/
Boolean releaseSilence(Long silenceId);
/**
* 修改屏蔽告警的规则
* @param monitorSilenceDTO 屏蔽告警的信息
* @param operator 操作人
* @return 操作结果
*/
Result modifySilence(MonitorSilenceDTO monitorSilenceDTO, String operator);
/**
* 获取屏蔽策略
* @param strategyId 告警策略ID
* @return
*/
Result<List<Silence>> getSilences(Long strategyId);
/**
* 获取屏蔽详情
* @param silenceId 屏蔽ID
* @return
*/
Silence getSilenceById(Long silenceId);
/**
* 获取告警接收组
* @return
*/
List<NotifyGroup> getNotifyGroups();
}

View File

@@ -14,44 +14,34 @@ public abstract class AbstractMonitorService {
* 监控策略的增删改查
*/
public abstract Integer createStrategy(Strategy strategy);
public abstract Boolean deleteStrategyById(Long strategyId);
public abstract Boolean modifyStrategy(Strategy strategy);
public abstract List<Strategy> getStrategies();
public abstract Strategy getStrategyById(Long strategyId);
/**
* 告警的查
* 告警被触发后, 告警信息的查
*/
public abstract List<Alert> getAlerts(Long strategyId, Long startTime, Long endTime);
public abstract Alert getAlertById(Long alertId);
/**
* 屏蔽的增删改查
* 告警被触发之后, 进行屏蔽时, 屏蔽策略的增删改查
*/
public abstract Boolean createSilence(Silence silence);
public abstract Boolean releaseSilence(Long silenceId);
public abstract Boolean modifySilence(Silence silence);
public abstract List<Silence> getSilences(Long strategyId);
public abstract Silence getSilenceById(Long silenceId);
/**
* 指标的上报和查询
*/
public abstract Boolean sinkMetrics(List<MetricSinkPoint> pointList);
public abstract Metric getMetrics(String metric, Long startTime, Long endTime, Integer step, Properties tags);
/**
* 告警组
* 告警组获取
*/
public abstract List<NotifyGroup> getNotifyGroups();
/**
* 监控指标的上报和查询
*/
public abstract Boolean sinkMetrics(List<MetricSinkPoint> pointList);
public abstract Metric getMetrics(String metric, Long startTime, Long endTime, Integer step, Properties tags);
}