mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
@@ -1,6 +1,42 @@
|
||||
|
||||
## v3.0.0-beta.1
|
||||
|
||||
**文档**
|
||||
- 新增Task模块说明文档
|
||||
- FAQ补充 `Specified key was too long; max key length is 767 bytes ` 错误说明
|
||||
- FAQ补充 `出现ESIndexNotFoundException报错` 错误说明
|
||||
|
||||
|
||||
## v3.0.0-beta
|
||||
**Bug修复**
|
||||
- 修复 Consumer 点击 Stop 未停止检索的问题
|
||||
- 修复创建/编辑角色权限报错问题
|
||||
- 修复多集群管理/单集群详情均衡卡片状态错误问题
|
||||
- 修复版本列表未排序问题
|
||||
- 修复Raft集群Controller信息不断记录问题
|
||||
- 修复部分版本消费组描述信息获取失败问题
|
||||
- 修复分区Offset获取失败的日志中,缺少Topic名称信息问题
|
||||
- 修复GitHub图地址错误,及图裂问题
|
||||
- 修复Broker默认使用的地址和注释不一致问题
|
||||
- 修复 Consumer 列表分页不生效问题
|
||||
- 修复操作记录表operation_methods字段缺少默认值问题
|
||||
- 修复集群均衡表中move_broker_list字段无效的问题
|
||||
- 修复KafkaUser、KafkaACL信息获取时,日志一直重复提示不支持问题
|
||||
- 修复指标缺失时,曲线出现掉底的问题
|
||||
|
||||
|
||||
**体验优化**
|
||||
- 优化前端构建时间和打包体积,增加依赖打包的分包策略
|
||||
- 优化产品样式和文案展示
|
||||
- 优化ES客户端数为可配置
|
||||
- 优化日志中大量出现的MySQL Key冲突日志
|
||||
|
||||
|
||||
**能力提升**
|
||||
- 增加周期任务,用于主动创建缺少的ES模版及索引的能力,减少额外的脚本操作
|
||||
- 增加JMX连接的Broker地址可选择的能力
|
||||
|
||||
|
||||
## v3.0.0-beta.0
|
||||
|
||||
**1、多集群管理**
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# KnowStreaming Task模块简介
|
||||
# Task模块简介
|
||||
|
||||
## 1、Task简介
|
||||
|
||||
@@ -1,6 +1,35 @@
|
||||
## 6.2、版本升级手册
|
||||
|
||||
**`2.x`版本 升级至 `3.0.0`版本**
|
||||
注意:如果想升级至具体版本,需要将你当前版本至你期望使用版本的变更统统执行一遍,然后才能正常使用。
|
||||
|
||||
### 6.2.0、升级至 `master` 版本
|
||||
|
||||
暂无
|
||||
|
||||
---
|
||||
|
||||
### 6.2.1、升级至 `v3.0.0-beta.1`版本
|
||||
|
||||
|
||||
**SQL变更**
|
||||
|
||||
1、在`ks_km_broker`表增加了一个监听信息字段。
|
||||
2、为`logi_security_oplog`表operation_methods字段设置默认值''。
|
||||
因此需要执行下面的sql对数据库表进行更新。
|
||||
|
||||
```sql
|
||||
ALTER TABLE `ks_km_broker`
|
||||
ADD COLUMN `endpoint_map` VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '监听信息' AFTER `update_time`;
|
||||
|
||||
ALTER TABLE `logi_security_oplog`
|
||||
ALTER COLUMN `operation_methods` set default '';
|
||||
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
|
||||
### 6.2.2、`2.x`版本 升级至 `v3.0.0-beta.0`版本
|
||||
|
||||
**升级步骤:**
|
||||
|
||||
|
||||
@@ -109,3 +109,21 @@ SECURITY.TRICK_USERS
|
||||
设置完成上面两步之后,就可以直接调用需要登录的接口了。
|
||||
|
||||
但是还有一点需要注意,绕过的用户仅能调用他有权限的接口,比如一个普通用户,那么他就只能调用普通的接口,不能去调用运维人员的接口。
|
||||
|
||||
## 8.8、Specified key was too long; max key length is 767 bytes
|
||||
|
||||
**原因:**不同版本的InoDB引擎,参数‘innodb_large_prefix’默认值不同,即在5.6默认值为OFF,5.7默认值为ON。
|
||||
|
||||
对于引擎为InnoDB,innodb_large_prefix=OFF,且行格式为Antelope即支持REDUNDANT或COMPACT时,索引键前缀长度最大为 767 字节。innodb_large_prefix=ON,且行格式为Barracuda即支持DYNAMIC或COMPRESSED时,索引键前缀长度最大为3072字节。
|
||||
|
||||
**解决方案:**
|
||||
|
||||
- 减少varchar字符大小低于767/4=191。
|
||||
- 将字符集改为latin1(一个字符=一个字节)。
|
||||
- 开启‘innodb_large_prefix’,修改默认行格式‘innodb_file_format’为Barracuda,并设置row_format=dynamic。
|
||||
|
||||
## 8.9、出现ESIndexNotFoundEXception报错
|
||||
|
||||
**原因 :**没有创建ES索引模版
|
||||
|
||||
**解决方案:**执行init_es_template.sh脚本,创建ES索引模版即可。
|
||||
|
||||
@@ -5,7 +5,6 @@ import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory;
|
||||
@@ -21,6 +20,8 @@ import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
@Component
|
||||
public class MetricESSender implements ApplicationListener<BaseMetricEvent> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
@@ -41,37 +42,37 @@ public class MetricESSender implements ApplicationListener<BaseMetricEvent> {
|
||||
public void onApplicationEvent(BaseMetricEvent event) {
|
||||
if(event instanceof BrokerMetricEvent) {
|
||||
BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.BROKER_INFO,
|
||||
send2es(BROKER_INDEX,
|
||||
ConvertUtil.list2List(brokerMetricEvent.getBrokerMetrics(), BrokerMetricPO.class)
|
||||
);
|
||||
|
||||
} else if(event instanceof ClusterMetricEvent) {
|
||||
ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.CLUSTER_INFO,
|
||||
send2es(CLUSTER_INDEX,
|
||||
ConvertUtil.list2List(clusterMetricEvent.getClusterMetrics(), ClusterMetricPO.class)
|
||||
);
|
||||
|
||||
} else if(event instanceof TopicMetricEvent) {
|
||||
TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.TOPIC_INFO,
|
||||
send2es(TOPIC_INDEX,
|
||||
ConvertUtil.list2List(topicMetricEvent.getTopicMetrics(), TopicMetricPO.class)
|
||||
);
|
||||
|
||||
} else if(event instanceof PartitionMetricEvent) {
|
||||
PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.PARTITION_INFO,
|
||||
send2es(PARTITION_INDEX,
|
||||
ConvertUtil.list2List(partitionMetricEvent.getPartitionMetrics(), PartitionMetricPO.class)
|
||||
);
|
||||
|
||||
} else if(event instanceof GroupMetricEvent) {
|
||||
GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.GROUP_INFO,
|
||||
send2es(GROUP_INDEX,
|
||||
ConvertUtil.list2List(groupMetricEvent.getGroupMetrics(), GroupMetricPO.class)
|
||||
);
|
||||
|
||||
} else if(event instanceof ReplicaMetricEvent) {
|
||||
ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.REPLICATION_INFO,
|
||||
send2es(REPLICATION_INDEX,
|
||||
ConvertUtil.list2List(replicaMetricEvent.getReplicationMetrics(), ReplicationMetricPO.class)
|
||||
);
|
||||
}
|
||||
@@ -80,19 +81,19 @@ public class MetricESSender implements ApplicationListener<BaseMetricEvent> {
|
||||
/**
|
||||
* 根据不同监控维度来发送
|
||||
*/
|
||||
private boolean send2es(KafkaMetricIndexEnum stats, List<? extends BaseESPO> statsList){
|
||||
private boolean send2es(String index, List<? extends BaseESPO> statsList){
|
||||
if (CollectionUtils.isEmpty(statsList)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!EnvUtil.isOnline()) {
|
||||
LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}",
|
||||
stats.getIndex(), statsList.size());
|
||||
index, statsList.size());
|
||||
}
|
||||
|
||||
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(stats);
|
||||
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index);
|
||||
if (Objects.isNull( baseMetricESDao )) {
|
||||
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", stats.getIndex());
|
||||
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.broker;
|
||||
|
||||
|
||||
import com.alibaba.fastjson.TypeReference;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
@@ -7,6 +12,7 @@ import lombok.NoArgsConstructor;
|
||||
import org.apache.kafka.common.Node;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
@@ -55,6 +61,11 @@ public class Broker implements Serializable {
|
||||
*/
|
||||
private Integer status;
|
||||
|
||||
/**
|
||||
* 监听信息
|
||||
*/
|
||||
private Map<String, IpPortData> endpointMap;
|
||||
|
||||
public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp) {
|
||||
Broker metadata = new Broker();
|
||||
metadata.setClusterPhyId(clusterPhyId);
|
||||
@@ -78,9 +89,31 @@ public class Broker implements Serializable {
|
||||
metadata.setStartTimestamp(brokerMetadata.getTimestamp());
|
||||
metadata.setRack(brokerMetadata.getRack());
|
||||
metadata.setStatus(1);
|
||||
metadata.setEndpointMap(brokerMetadata.getEndpointMap());
|
||||
return metadata;
|
||||
}
|
||||
|
||||
public static Broker buildFrom(BrokerPO brokerPO) {
|
||||
Broker broker = ConvertUtil.obj2Obj(brokerPO, Broker.class);
|
||||
String endpointMapStr = brokerPO.getEndpointMap();
|
||||
if (broker == null || endpointMapStr == null || endpointMapStr.equals("")) {
|
||||
return broker;
|
||||
}
|
||||
|
||||
// 填充endpoint信息
|
||||
Map<String, IpPortData> endpointMap = ConvertUtil.str2ObjByJson(endpointMapStr, new TypeReference<Map<String, IpPortData>>(){});
|
||||
broker.setEndpointMap(endpointMap);
|
||||
return broker;
|
||||
}
|
||||
|
||||
public String getJmxHost(String endPoint) {
|
||||
if (endPoint == null || endpointMap == null) {
|
||||
return host;
|
||||
}
|
||||
IpPortData ip = endpointMap.get(endPoint);
|
||||
return ip == null ? ip.getIp() : host;
|
||||
}
|
||||
|
||||
public boolean alive() {
|
||||
return status != null && status > 0;
|
||||
}
|
||||
|
||||
@@ -27,6 +27,9 @@ public class JmxConfig implements Serializable {
|
||||
|
||||
@ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19")
|
||||
private String token;
|
||||
|
||||
@ApiModelProperty(value="使用哪个endpoint网络", example = "EXTERNAL")
|
||||
private String useWhichEndpoint;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
@@ -10,13 +10,13 @@ import java.util.Map;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class PartitionOffsetParam extends ClusterPhyParam {
|
||||
public class PartitionOffsetParam extends TopicParam {
|
||||
private Map<TopicPartition, OffsetSpec> topicPartitionOffsets;
|
||||
|
||||
private Long timestamp;
|
||||
|
||||
public PartitionOffsetParam(Long clusterPhyId, Map<TopicPartition, OffsetSpec> topicPartitionOffsets, Long timestamp) {
|
||||
super(clusterPhyId);
|
||||
public PartitionOffsetParam(Long clusterPhyId, String topicName, Map<TopicPartition, OffsetSpec> topicPartitionOffsets, Long timestamp) {
|
||||
super(clusterPhyId, topicName);
|
||||
this.topicPartitionOffsets = topicPartitionOffsets;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
@@ -15,4 +15,12 @@ public class TopicParam extends ClusterPhyParam {
|
||||
super(clusterPhyId);
|
||||
this.topicName = topicName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TopicParam{" +
|
||||
"clusterPhyId=" + clusterPhyId +
|
||||
", topicName='" + topicName + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,4 +42,9 @@ public class BrokerPO extends BasePO {
|
||||
* Broker状态
|
||||
*/
|
||||
private Integer status;
|
||||
|
||||
/**
|
||||
* 监听信息
|
||||
*/
|
||||
private String endpointMap;
|
||||
}
|
||||
|
||||
@@ -29,6 +29,10 @@ public class MetricPointVO implements Comparable<MetricPointVO> {
|
||||
@Override
|
||||
public int compareTo(MetricPointVO o) {
|
||||
if(null == o){return 0;}
|
||||
if(null == this.getTimeStamp()
|
||||
|| null == o.getTimeStamp()){
|
||||
return 0;
|
||||
}
|
||||
|
||||
return this.getTimeStamp().intValue() - o.getTimeStamp().intValue();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,647 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.constant;
|
||||
|
||||
public class ESIndexConstant {
|
||||
|
||||
public final static String TOPIC_INDEX = "ks_kafka_topic_metric";
|
||||
public final static String TOPIC_TEMPLATE = "{\n" +
|
||||
" \"order\" : 10,\n" +
|
||||
" \"index_patterns\" : [\n" +
|
||||
" \"ks_kafka_topic_metric*\"\n" +
|
||||
" ],\n" +
|
||||
" \"settings\" : {\n" +
|
||||
" \"index\" : {\n" +
|
||||
" \"number_of_shards\" : \"10\"\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"mappings\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"brokerId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"routingValue\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"topic\" : {\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" },\n" +
|
||||
" \"clusterPhyId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"metrics\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"BytesIn_min_15\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"Messages\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesRejected\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"PartitionURP\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckTotal\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"ReplicationCount\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"ReplicationBytesOut\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"ReplicationBytesIn\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"FailedFetchRequests\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesIn_min_5\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthScore\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"LogSize\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesOut\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesOut_min_15\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"FailedProduceRequests\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesIn\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesOut_min_5\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"MessagesIn\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"TotalProduceRequests\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckPassed\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"brokerAgg\" : {\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" },\n" +
|
||||
" \"key\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"timestamp\" : {\n" +
|
||||
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
|
||||
" \"index\" : true,\n" +
|
||||
" \"type\" : \"date\",\n" +
|
||||
" \"doc_values\" : true\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"aliases\" : { }\n" +
|
||||
" }";
|
||||
|
||||
public final static String CLUSTER_INDEX = "ks_kafka_cluster_metric";
|
||||
public final static String CLUSTER_TEMPLATE = "{\n" +
|
||||
" \"order\" : 10,\n" +
|
||||
" \"index_patterns\" : [\n" +
|
||||
" \"ks_kafka_cluster_metric*\"\n" +
|
||||
" ],\n" +
|
||||
" \"settings\" : {\n" +
|
||||
" \"index\" : {\n" +
|
||||
" \"number_of_shards\" : \"10\"\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"mappings\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"routingValue\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"clusterPhyId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"metrics\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"Connections\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesIn_min_15\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"PartitionURP\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthScore_Topics\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"EventQueueSize\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"ActiveControllerCount\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"GroupDeads\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesIn_min_5\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckTotal_Topics\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"Partitions\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesOut\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"Groups\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesOut_min_15\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"TotalRequestQueueSize\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckPassed_Groups\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"TotalProduceRequests\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckPassed\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"TotalLogSize\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"GroupEmptys\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"PartitionNoLeader\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthScore_Brokers\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"Messages\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"Topics\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"PartitionMinISR_E\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckTotal\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"Brokers\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"Replicas\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckTotal_Groups\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"GroupRebalances\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"MessageIn\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthScore\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckPassed_Topics\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckTotal_Brokers\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"PartitionMinISR_S\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesIn\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesOut_min_5\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"GroupActives\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"MessagesIn\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"GroupReBalances\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckPassed_Brokers\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthScore_Groups\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"TotalResponseQueueSize\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"Zookeepers\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"LeaderMessages\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthScore_Cluster\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckPassed_Cluster\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckTotal_Cluster\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"key\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"timestamp\" : {\n" +
|
||||
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
|
||||
" \"type\" : \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"aliases\" : { }\n" +
|
||||
" }";
|
||||
|
||||
public final static String BROKER_INDEX = "ks_kafka_broker_metric";
|
||||
public final static String BROKER_TEMPLATE = "{\n" +
|
||||
" \"order\" : 10,\n" +
|
||||
" \"index_patterns\" : [\n" +
|
||||
" \"ks_kafka_broker_metric*\"\n" +
|
||||
" ],\n" +
|
||||
" \"settings\" : {\n" +
|
||||
" \"index\" : {\n" +
|
||||
" \"number_of_shards\" : \"10\"\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"mappings\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"brokerId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"routingValue\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"clusterPhyId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"metrics\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"NetworkProcessorAvgIdle\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"UnderReplicatedPartitions\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesIn_min_15\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckTotal\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"RequestHandlerAvgIdle\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"connectionsCount\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesIn_min_5\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthScore\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesOut\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesOut_min_15\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesIn\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"BytesOut_min_5\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"TotalRequestQueueSize\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"MessagesIn\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"TotalProduceRequests\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckPassed\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"TotalResponseQueueSize\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"key\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"timestamp\" : {\n" +
|
||||
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
|
||||
" \"index\" : true,\n" +
|
||||
" \"type\" : \"date\",\n" +
|
||||
" \"doc_values\" : true\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"aliases\" : { }\n" +
|
||||
" }";
|
||||
|
||||
public final static String PARTITION_INDEX = "ks_kafka_partition_metric";
|
||||
public final static String PARTITION_TEMPLATE = "{\n" +
|
||||
" \"order\" : 10,\n" +
|
||||
" \"index_patterns\" : [\n" +
|
||||
" \"ks_kafka_partition_metric*\"\n" +
|
||||
" ],\n" +
|
||||
" \"settings\" : {\n" +
|
||||
" \"index\" : {\n" +
|
||||
" \"number_of_shards\" : \"10\"\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"mappings\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"brokerId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"partitionId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"routingValue\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"clusterPhyId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"topic\" : {\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" },\n" +
|
||||
" \"metrics\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"LogStartOffset\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"Messages\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"LogEndOffset\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"key\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"timestamp\" : {\n" +
|
||||
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
|
||||
" \"index\" : true,\n" +
|
||||
" \"type\" : \"date\",\n" +
|
||||
" \"doc_values\" : true\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"aliases\" : { }\n" +
|
||||
" }";
|
||||
|
||||
public final static String GROUP_INDEX = "ks_kafka_group_metric";
|
||||
public final static String GROUP_TEMPLATE = "{\n" +
|
||||
" \"order\" : 10,\n" +
|
||||
" \"index_patterns\" : [\n" +
|
||||
" \"ks_kafka_group_metric*\"\n" +
|
||||
" ],\n" +
|
||||
" \"settings\" : {\n" +
|
||||
" \"index\" : {\n" +
|
||||
" \"number_of_shards\" : \"10\"\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"mappings\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"group\" : {\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" },\n" +
|
||||
" \"partitionId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"routingValue\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"clusterPhyId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"topic\" : {\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" },\n" +
|
||||
" \"metrics\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"HealthScore\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"Lag\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"OffsetConsumed\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckTotal\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"HealthCheckPassed\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"groupMetric\" : {\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" },\n" +
|
||||
" \"key\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"timestamp\" : {\n" +
|
||||
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
|
||||
" \"index\" : true,\n" +
|
||||
" \"type\" : \"date\",\n" +
|
||||
" \"doc_values\" : true\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"aliases\" : { }\n" +
|
||||
" }";
|
||||
|
||||
public final static String REPLICATION_INDEX = "ks_kafka_replication_metric";
|
||||
public final static String REPLICATION_TEMPLATE = "{\n" +
|
||||
" \"order\" : 10,\n" +
|
||||
" \"index_patterns\" : [\n" +
|
||||
" \"ks_kafka_partition_metric*\"\n" +
|
||||
" ],\n" +
|
||||
" \"settings\" : {\n" +
|
||||
" \"index\" : {\n" +
|
||||
" \"number_of_shards\" : \"10\"\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"mappings\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"brokerId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"partitionId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"routingValue\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"clusterPhyId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"topic\" : {\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" },\n" +
|
||||
" \"metrics\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"LogStartOffset\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"Messages\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" },\n" +
|
||||
" \"LogEndOffset\" : {\n" +
|
||||
" \"type\" : \"float\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"key\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"timestamp\" : {\n" +
|
||||
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
|
||||
" \"index\" : true,\n" +
|
||||
" \"type\" : \"date\",\n" +
|
||||
" \"doc_values\" : true\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"aliases\" : { }\n" +
|
||||
" }[root@10-255-0-23 template]# cat ks_kafka_replication_metric\n" +
|
||||
"PUT _template/ks_kafka_replication_metric\n" +
|
||||
"{\n" +
|
||||
" \"order\" : 10,\n" +
|
||||
" \"index_patterns\" : [\n" +
|
||||
" \"ks_kafka_replication_metric*\"\n" +
|
||||
" ],\n" +
|
||||
" \"settings\" : {\n" +
|
||||
" \"index\" : {\n" +
|
||||
" \"number_of_shards\" : \"10\"\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"mappings\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"timestamp\" : {\n" +
|
||||
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
|
||||
" \"index\" : true,\n" +
|
||||
" \"type\" : \"date\",\n" +
|
||||
" \"doc_values\" : true\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"aliases\" : { }\n" +
|
||||
" }";
|
||||
|
||||
}
|
||||
@@ -33,7 +33,7 @@ public class KafkaConstant {
|
||||
|
||||
public static final Integer DATA_VERSION_ONE = 1;
|
||||
|
||||
public static final Integer ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS = 3000;
|
||||
public static final Integer ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS = 5000;
|
||||
|
||||
public static final Integer KAFKA_SASL_SCRAM_ITERATIONS = 8192;
|
||||
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.enums.metric;
|
||||
|
||||
/**
|
||||
* @author: D10865
|
||||
* @description:
|
||||
* @date: Create on 2019/3/11 下午2:19
|
||||
* @modified By D10865
|
||||
*
|
||||
* 不同维度的es监控数据
|
||||
*/
|
||||
public enum KafkaMetricIndexEnum {
|
||||
|
||||
/**
|
||||
* topic 维度
|
||||
*/
|
||||
TOPIC_INFO("ks_kafka_topic_metric"),
|
||||
|
||||
/**
|
||||
* 集群 维度
|
||||
*/
|
||||
CLUSTER_INFO("ks_kafka_cluster_metric"),
|
||||
|
||||
/**
|
||||
* broker 维度
|
||||
*/
|
||||
BROKER_INFO("ks_kafka_broker_metric"),
|
||||
|
||||
/**
|
||||
* partition 维度
|
||||
*/
|
||||
PARTITION_INFO("ks_kafka_partition_metric"),
|
||||
|
||||
/**
|
||||
* group 维度
|
||||
*/
|
||||
GROUP_INFO("ks_kafka_group_metric"),
|
||||
|
||||
/**
|
||||
* replication 维度
|
||||
*/
|
||||
REPLICATION_INFO("ks_kafka_replication_metric"),
|
||||
|
||||
;
|
||||
|
||||
private String index;
|
||||
|
||||
KafkaMetricIndexEnum(String index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
}
|
||||
@@ -130,6 +130,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
|
||||
// 如果当前Broker还存活,则更新DB信息
|
||||
BrokerPO newBrokerPO = ConvertUtil.obj2Obj(presentAliveBroker, BrokerPO.class);
|
||||
if (presentAliveBroker.getEndpointMap() != null) {
|
||||
newBrokerPO.setEndpointMap(ConvertUtil.obj2Json(presentAliveBroker.getEndpointMap()));
|
||||
}
|
||||
newBrokerPO.setId(inDBBrokerPO.getId());
|
||||
newBrokerPO.setStatus(Constant.ALIVE);
|
||||
newBrokerPO.setCreateTime(inDBBrokerPO.getCreateTime());
|
||||
@@ -203,7 +206,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
lambdaQueryWrapper.eq(BrokerPO::getClusterPhyId, clusterPhyId);
|
||||
lambdaQueryWrapper.eq(BrokerPO::getBrokerId, brokerId);
|
||||
|
||||
return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class);
|
||||
return Broker.buildFrom(brokerDAO.selectOne(lambdaQueryWrapper));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -272,9 +275,8 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private List<Broker> listAllBrokersAndUpdateCache(Long clusterPhyId) {
|
||||
List<Broker> allBrokerList = ConvertUtil.list2List(this.getAllBrokerPOsFromDB(clusterPhyId), Broker.class);
|
||||
List<Broker> allBrokerList = getAllBrokerPOsFromDB(clusterPhyId).stream().map(elem -> Broker.buildFrom(elem)).collect(Collectors.toList());
|
||||
brokersCache.put(clusterPhyId, allBrokerList);
|
||||
|
||||
return allBrokerList;
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,10 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
|
||||
AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId);
|
||||
|
||||
try {
|
||||
DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups(Arrays.asList(groupName), new DescribeConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).includeAuthorizedOperations(true));
|
||||
DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups(
|
||||
Arrays.asList(groupName),
|
||||
new DescribeConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).includeAuthorizedOperations(false)
|
||||
);
|
||||
|
||||
return describeConsumerGroupsResult.all().get().get(groupName);
|
||||
} catch(Exception e){
|
||||
|
||||
@@ -207,7 +207,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
|
||||
.forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec));
|
||||
|
||||
try {
|
||||
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicPartitionOffsets, timestamp));
|
||||
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp));
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
}
|
||||
@@ -226,7 +226,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
|
||||
.forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec));
|
||||
|
||||
try {
|
||||
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicPartitionOffsets, timestamp));
|
||||
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp));
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
}
|
||||
@@ -300,7 +300,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
|
||||
} catch (NotExistException nee) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId()));
|
||||
} catch (Exception e) {
|
||||
log.error("method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||errMsg=exception!", offsetParam.getClusterPhyId(), e);
|
||||
log.error(
|
||||
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||errMsg=exception!",
|
||||
offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
|
||||
}
|
||||
@@ -355,7 +358,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
|
||||
} catch (NotExistException nee) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId()));
|
||||
} catch (Exception e) {
|
||||
log.error("method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||errMsg=exception!", offsetParam.getClusterPhyId(), e);
|
||||
log.error(
|
||||
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||topicName={}||errMsg=exception!",
|
||||
offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
|
||||
} finally {
|
||||
|
||||
@@ -64,11 +64,13 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
|
||||
public static final String CLUSTER_METRIC_BYTES_OUT = "BytesOut";
|
||||
public static final String CLUSTER_METRIC_BYTES_OUT_5_MIN = "BytesOut_min_5";
|
||||
public static final String CLUSTER_METRIC_BYTES_OUT_15_MIN = "BytesOut_min_15";
|
||||
|
||||
public static final String CLUSTER_METRIC_GROUP = "Groups";
|
||||
public static final String CLUSTER_METRIC_GROUP_ACTIVES = "GroupActives";
|
||||
public static final String CLUSTER_METRIC_GROUP_EMPTYS = "GroupEmptys";
|
||||
public static final String CLUSTER_METRIC_GROUP_REBALANCES = "GroupRebalances";
|
||||
public static final String CLUSTER_METRIC_GROUP_DEADS = "GroupDeads";
|
||||
|
||||
public static final String CLUSTER_METRIC_ALIVE = "Alive";
|
||||
|
||||
public static final String CLUSTER_METRIC_ACL_ENABLE = "AclEnable";
|
||||
|
||||
@@ -13,6 +13,7 @@ CREATE TABLE `ks_km_broker` (
|
||||
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活',
|
||||
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
`endpoint_map` varchar(1024) NOT NULL DEFAULT '' COMMENT '监听信息',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_cluster_phy_id_broker_id` (`cluster_phy_id`,`broker_id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Broker信息表';
|
||||
|
||||
@@ -8,7 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
/**
|
||||
* 直接操作es集群的dao
|
||||
*/
|
||||
public class BaseESDAO {
|
||||
public abstract class BaseESDAO {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("ES_LOGGER");
|
||||
|
||||
/**
|
||||
|
||||
@@ -11,7 +11,11 @@ import com.didiglobal.logi.elasticsearch.client.request.batch.ESBatchRequest;
|
||||
import com.didiglobal.logi.elasticsearch.client.request.query.query.ESQueryRequest;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.batch.ESBatchResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.batch.IndexResultItemNode;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.gettemplate.ESIndicesGetTemplateResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.putindex.ESIndicesPutIndexResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.puttemplate.ESIndicesPutTemplateResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.setting.template.TemplateConfig;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.google.common.collect.Lists;
|
||||
@@ -340,7 +344,94 @@ public class ESOpClient {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据表达式判断索引是否已存在
|
||||
*/
|
||||
public boolean indexExist(String indexName) {
|
||||
ESClient esClient = null;
|
||||
try {
|
||||
esClient = this.getESClientFromPool();
|
||||
if (esClient == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 检查索引是否存在
|
||||
return esClient.admin().indices().prepareExists(indexName).execute().actionGet(30, TimeUnit.SECONDS).isExists();
|
||||
} catch (Exception e){
|
||||
LOGGER.warn("class=ESOpClient||method=indexExist||indexName={}||msg=exception!", indexName, e);
|
||||
} finally {
|
||||
if (esClient != null) {
|
||||
returnESClientToPool(esClient);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建索引
|
||||
*/
|
||||
public boolean createIndex(String indexName) {
|
||||
if (indexExist(indexName)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
ESClient client = getESClientFromPool();
|
||||
if (client != null) {
|
||||
try {
|
||||
ESIndicesPutIndexResponse response = client.admin().indices().preparePutIndex(indexName).execute()
|
||||
.actionGet(30, TimeUnit.SECONDS);
|
||||
return response.getAcknowledged();
|
||||
} catch (Exception e){
|
||||
LOGGER.warn( "msg=create index fail||indexName={}", indexName, e);
|
||||
} finally {
|
||||
returnESClientToPool(client);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建索引模板
|
||||
*/
|
||||
public boolean createIndexTemplateIfNotExist(String indexTemplateName, String config) {
|
||||
ESClient esClient = null;
|
||||
|
||||
try {
|
||||
esClient = this.getESClientFromPool();
|
||||
|
||||
// 获取es中原来index template的配置
|
||||
ESIndicesGetTemplateResponse getTemplateResponse =
|
||||
esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( 30, TimeUnit.SECONDS );
|
||||
|
||||
TemplateConfig templateConfig = getTemplateResponse.getMultiTemplatesConfig().getSingleConfig();
|
||||
|
||||
if (null != templateConfig) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 创建新的模板
|
||||
ESIndicesPutTemplateResponse response = esClient.admin().indices().preparePutTemplate( indexTemplateName )
|
||||
.setTemplateConfig( config ).execute().actionGet( 30, TimeUnit.SECONDS );
|
||||
|
||||
return response.getAcknowledged();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn(
|
||||
"class=ESOpClient||method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!",
|
||||
indexTemplateName, config, e
|
||||
);
|
||||
} finally {
|
||||
if (esClient != null) {
|
||||
this.returnESClientToPool(esClient);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
/**
|
||||
* 执行查询
|
||||
* @param request
|
||||
|
||||
@@ -8,11 +8,12 @@ import com.google.common.collect.Maps;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.*;
|
||||
@@ -25,7 +26,8 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
/**
|
||||
* 操作的索引名称
|
||||
*/
|
||||
protected String indexName;
|
||||
protected String indexName;
|
||||
protected String indexTemplate;
|
||||
|
||||
protected static final Long ONE_MIN = 60 * 1000L;
|
||||
protected static final Long FIVE_MIN = 5 * ONE_MIN;
|
||||
@@ -35,10 +37,24 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
/**
|
||||
* 不同维度 kafka 监控数据
|
||||
*/
|
||||
private static Map<KafkaMetricIndexEnum, BaseMetricESDAO> ariusStatsEsDaoMap = Maps
|
||||
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps
|
||||
.newConcurrentMap();
|
||||
|
||||
public static BaseMetricESDAO getByStatsType(KafkaMetricIndexEnum statsType) {
|
||||
/**
|
||||
* 检查 es 索引是否存在,不存在则创建索引
|
||||
*/
|
||||
@Scheduled(cron = "0 3/5 * * * ?")
|
||||
public void checkCurrentDayIndexExist(){
|
||||
String realIndex = IndexNameUtils.genCurrentDailyIndexName(indexName);
|
||||
|
||||
if(esOpClient.indexExist(realIndex)){return;}
|
||||
|
||||
if(esOpClient.createIndexTemplateIfNotExist(indexName, indexTemplate)){
|
||||
esOpClient.createIndex(realIndex);
|
||||
}
|
||||
}
|
||||
|
||||
public static BaseMetricESDAO getByStatsType(String statsType) {
|
||||
return ariusStatsEsDaoMap.get(statsType);
|
||||
}
|
||||
|
||||
@@ -48,7 +64,7 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
* @param statsType
|
||||
* @param baseAriusStatsEsDao
|
||||
*/
|
||||
public static void register(KafkaMetricIndexEnum statsType, BaseMetricESDAO baseAriusStatsEsDao) {
|
||||
public static void register(String statsType, BaseMetricESDAO baseAriusStatsEsDao) {
|
||||
ariusStatsEsDaoMap.put(statsType, baseAriusStatsEsDao);
|
||||
}
|
||||
|
||||
@@ -358,7 +374,50 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
String dsl = dslLoaderUtil.getFormatDslByFileName(DslsConstant.GET_LATEST_METRIC_TIME, startTime, endTime, appendQueryDsl);
|
||||
String realIndexName = IndexNameUtils.genDailyIndexName(indexName, startTime, endTime);
|
||||
|
||||
return esOpClient.performRequest(realIndexName, dsl, s -> s.getHits().getHits().isEmpty()
|
||||
? System.currentTimeMillis() : ((Map<String, Long>)s.getHits().getHits().get(0).getSource()).get(TIME_STAMP), 3);
|
||||
return esOpClient.performRequest(
|
||||
realIndexName,
|
||||
dsl,
|
||||
s -> s == null || s.getHits().getHits().isEmpty() ? System.currentTimeMillis() : ((Map<String, Long>)s.getHits().getHits().get(0).getSource()).get(TIME_STAMP),
|
||||
3
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 对 metricPointVOS 进行缺点优化
|
||||
*/
|
||||
protected List<MetricPointVO> optimizeMetricPoints(List<MetricPointVO> metricPointVOS){
|
||||
if(CollectionUtils.isEmpty(metricPointVOS)){return metricPointVOS;}
|
||||
|
||||
int size = metricPointVOS.size();
|
||||
if(size < 2){return metricPointVOS;}
|
||||
|
||||
Collections.sort(metricPointVOS);
|
||||
|
||||
List<MetricPointVO> rets = new ArrayList<>();
|
||||
for(int first = 0, second = first + 1; second < size; first++, second++){
|
||||
MetricPointVO firstPoint = metricPointVOS.get(first);
|
||||
MetricPointVO secondPoint = metricPointVOS.get(second);
|
||||
|
||||
if(null != firstPoint && null != secondPoint){
|
||||
rets.add(firstPoint);
|
||||
|
||||
//说明有空点,那就增加一个点
|
||||
if(secondPoint.getTimeStamp() - firstPoint.getTimeStamp() > ONE_MIN){
|
||||
MetricPointVO addPoint = new MetricPointVO();
|
||||
addPoint.setName(firstPoint.getName());
|
||||
addPoint.setAggType(firstPoint.getAggType());
|
||||
addPoint.setValue(firstPoint.getValue());
|
||||
addPoint.setTimeStamp(firstPoint.getTimeStamp() + ONE_MIN);
|
||||
|
||||
rets.add(addPoint);
|
||||
}
|
||||
|
||||
if(second == size - 1){
|
||||
rets.add(secondPoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rets;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,14 +18,16 @@ import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.BROKER_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
@Component
|
||||
public class BrokerMetricESDAO extends BaseMetricESDAO {
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = BROKER_INFO.getIndex();
|
||||
BaseMetricESDAO.register(BROKER_INFO, this);
|
||||
super.indexName = BROKER_INDEX;
|
||||
super.indexTemplate = BROKER_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("BrokerMetricESDAO", 4,8, 500);
|
||||
@@ -258,7 +260,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
} );
|
||||
|
||||
metricMap.put(metric, metricPoints);
|
||||
metricMap.put(metric, optimizeMetricPoints(metricPoints));
|
||||
}
|
||||
|
||||
return metricMap;
|
||||
|
||||
@@ -23,15 +23,17 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.CLUSTER_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
@Component
|
||||
public class ClusterMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = CLUSTER_INFO.getIndex();
|
||||
BaseMetricESDAO.register(CLUSTER_INFO, this);
|
||||
super.indexName = CLUSTER_INDEX;
|
||||
super.indexTemplate = CLUSTER_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("ClusterMetricESDAO", 4,8, 500);
|
||||
@@ -207,7 +209,7 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
} );
|
||||
|
||||
metricMap.put(metric, metricPoints);
|
||||
metricMap.put(metric, optimizeMetricPoints(metricPoints));
|
||||
}
|
||||
|
||||
return metricMap;
|
||||
|
||||
@@ -23,16 +23,17 @@ import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.Constant.ZERO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.KEY;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.GROUP_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
@Component
|
||||
public class GroupMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = GROUP_INFO.getIndex();
|
||||
BaseMetricESDAO.register(GROUP_INFO, this);
|
||||
super.indexName = GROUP_INDEX;
|
||||
super.indexTemplate = GROUP_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("GroupMetricESDAO", 4,8, 500);
|
||||
@@ -206,7 +207,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
} );
|
||||
|
||||
metricMap.put(metric, metricPoints);
|
||||
metricMap.put(metric, optimizeMetricPoints(metricPoints));
|
||||
}
|
||||
|
||||
return metricMap;
|
||||
|
||||
@@ -8,7 +8,7 @@ import javax.annotation.PostConstruct;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.PARTITION_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
@@ -18,8 +18,10 @@ public class PartitionMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = PARTITION_INFO.getIndex();
|
||||
BaseMetricESDAO.register(PARTITION_INFO, this);
|
||||
super.indexName = PARTITION_INDEX;
|
||||
super.indexTemplate = PARTITION_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
public PartitionMetricPO getPartitionLatestMetrics(Long clusterPhyId, String topic,
|
||||
|
||||
@@ -14,7 +14,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.VALUE;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.REPLICATION_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
@@ -24,8 +24,10 @@ public class ReplicationMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = REPLICATION_INFO.getIndex();
|
||||
BaseMetricESDAO.register(REPLICATION_INFO, this);
|
||||
super.indexName = REPLICATION_INDEX;
|
||||
super.indexTemplate = REPLICATION_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -22,15 +22,17 @@ import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.TOPIC_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
@Component
|
||||
public class TopicMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = TOPIC_INFO.getIndex();
|
||||
BaseMetricESDAO.register(TOPIC_INFO, this);
|
||||
super.indexName = TOPIC_INDEX;
|
||||
super.indexTemplate = TOPIC_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("TopicMetricESDAO", 4,8, 500);
|
||||
@@ -352,7 +354,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
} );
|
||||
|
||||
metricMap.put(metric, metricPoints);
|
||||
metricMap.put(metric, optimizeMetricPoints(metricPoints));
|
||||
}
|
||||
|
||||
return metricMap;
|
||||
|
||||
@@ -165,8 +165,8 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler {
|
||||
clusterPhy.getId(),
|
||||
brokerId,
|
||||
broker.getStartTimestamp(),
|
||||
broker.getHost(),
|
||||
broker.getJmxPort() != null? broker.getJmxPort(): jmxConfig.getJmxPort(),
|
||||
jmxConfig != null ? broker.getJmxHost(jmxConfig.getUseWhichEndpoint()) : broker.getHost(),
|
||||
broker.getJmxPort() != null ? broker.getJmxPort() : jmxConfig.getJmxPort(),
|
||||
jmxConfig
|
||||
);
|
||||
|
||||
@@ -191,6 +191,6 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler {
|
||||
lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE);
|
||||
|
||||
BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper);
|
||||
return ConvertUtil.obj2Obj(brokerPO, Broker.class);
|
||||
return Broker.buildFrom(brokerPO);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,5 +6,4 @@ import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface BrokerDAO extends BaseMapper<BrokerPO> {
|
||||
int replace(BrokerPO brokerPO);
|
||||
}
|
||||
|
||||
@@ -14,12 +14,7 @@
|
||||
<result column="jmx_port" property="jmxPort" />
|
||||
<result column="start_timestamp" property="startTimestamp" />
|
||||
<result column="status" property="status" />
|
||||
<result column="endpoint_map" property="endpointMap"/>
|
||||
</resultMap>
|
||||
|
||||
<insert id="replace" parameterType="com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO">
|
||||
REPLACE ks_km_broker
|
||||
(cluster_phy_id, broker_id, host, port, jmx_port, start_timestamp, status, update_time)
|
||||
VALUES
|
||||
(#{clusterPhyId}, #{brokerId}, #{host}, #{port}, #{jmxPort}, #{startTimestamp}, #{status}, #{updateTime})
|
||||
</insert>
|
||||
</mapper>
|
||||
|
||||
@@ -20,8 +20,8 @@ public class ClusterMetricESDAOTest extends KnowStreamApplicationTest {
|
||||
|
||||
@Test
|
||||
public void listClusterMetricsByClusterIdsTest(){
|
||||
List<String> metrics = Arrays.asList("BytesIn_min_1", "BytesOut_min_1");
|
||||
List<Long> clusterIds = Arrays.asList(123L);
|
||||
List<String> metrics = Arrays.asList("MessagesIn");
|
||||
List<Long> clusterIds = Arrays.asList(293L);
|
||||
Long endTime = System.currentTimeMillis();
|
||||
Long startTime = endTime - 4 * 60 * 60 * 1000;
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -24,11 +25,18 @@ import java.util.*;
|
||||
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Task(name = "HealthCheckTask", description = "健康检查", cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60)
|
||||
@Task(name = "HealthCheckTask",
|
||||
description = "健康检查",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(HealthCheckTask.class);
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Autowired
|
||||
private HealthCheckResultService healthCheckResultService;
|
||||
|
||||
@@ -38,6 +46,16 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
taskThreadPoolService.submitHeavenTask(
|
||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
100000,
|
||||
() -> this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs)
|
||||
);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
// 获取配置,<配置名,配置信息>
|
||||
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
|
||||
|
||||
@@ -73,8 +91,6 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
|
||||
} catch (Exception e) {
|
||||
log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
|
||||
}
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {
|
||||
|
||||
@@ -10,7 +10,7 @@ import javax.annotation.PostConstruct;
|
||||
/**
|
||||
* 为了尽量避免大任务的执行,由LogIJob的线程执行,
|
||||
* 因此,在Task模块,需要有自己的线程池来执行相关任务,
|
||||
* 而 FutureUtilsService 的职责就是负责任务的执行。
|
||||
* 而 TaskThreadPoolService 的职责就是负责任务的执行。
|
||||
*/
|
||||
@Service
|
||||
@NoArgsConstructor
|
||||
|
||||
Reference in New Issue
Block a user