diff --git a/Releases_Notes.md b/Releases_Notes.md index d2009a5e..8d19e266 100644 --- a/Releases_Notes.md +++ b/Releases_Notes.md @@ -1,6 +1,42 @@ +## v3.0.0-beta.1 + +**文档** +- 新增Task模块说明文档 +- FAQ补充 `Specified key was too long; max key length is 767 bytes ` 错误说明 +- FAQ补充 `出现ESIndexNotFoundException报错` 错误说明 + -## v3.0.0-beta +**Bug修复** +- 修复 Consumer 点击 Stop 未停止检索的问题 +- 修复创建/编辑角色权限报错问题 +- 修复多集群管理/单集群详情均衡卡片状态错误问题 +- 修复版本列表未排序问题 +- 修复Raft集群Controller信息不断记录问题 +- 修复部分版本消费组描述信息获取失败问题 +- 修复分区Offset获取失败的日志中,缺少Topic名称信息问题 +- 修复GitHub图地址错误,及图裂问题 +- 修复Broker默认使用的地址和注释不一致问题 +- 修复 Consumer 列表分页不生效问题 +- 修复操作记录表operation_methods字段缺少默认值问题 +- 修复集群均衡表中move_broker_list字段无效的问题 +- 修复KafkaUser、KafkaACL信息获取时,日志一直重复提示不支持问题 +- 修复指标缺失时,曲线出现掉底的问题 + + +**体验优化** +- 优化前端构建时间和打包体积,增加依赖打包的分包策略 +- 优化产品样式和文案展示 +- 优化ES客户端数为可配置 +- 优化日志中大量出现的MySQL Key冲突日志 + + +**能力提升** +- 增加周期任务,用于主动创建缺少的ES模版及索引的能力,减少额外的脚本操作 +- 增加JMX连接的Broker地址可选择的能力 + + +## v3.0.0-beta.0 **1、多集群管理** diff --git a/docs/dev_guide/KnowStreaming Task模块简介.md b/docs/dev_guide/Task模块简介.md similarity index 99% rename from docs/dev_guide/KnowStreaming Task模块简介.md rename to docs/dev_guide/Task模块简介.md index 59b5987c..688e033b 100644 --- a/docs/dev_guide/KnowStreaming Task模块简介.md +++ b/docs/dev_guide/Task模块简介.md @@ -1,4 +1,4 @@ -# KnowStreaming Task模块简介 +# Task模块简介 ## 1、Task简介 diff --git a/docs/install_guide/版本升级手册.md b/docs/install_guide/版本升级手册.md index 621d90bc..8c23b9be 100644 --- a/docs/install_guide/版本升级手册.md +++ b/docs/install_guide/版本升级手册.md @@ -1,6 +1,35 @@ ## 6.2、版本升级手册 -**`2.x`版本 升级至 `3.0.0`版本** +注意:如果想升级至具体版本,需要将你当前版本至你期望使用版本的变更统统执行一遍,然后才能正常使用。 + +### 6.2.0、升级至 `master` 版本 + +暂无 + +--- + +### 6.2.1、升级至 `v3.0.0-beta.1`版本 + + +**SQL变更** + +1、在`ks_km_broker`表增加了一个监听信息字段。 +2、为`logi_security_oplog`表operation_methods字段设置默认值''。 +因此需要执行下面的sql对数据库表进行更新。 + +```sql +ALTER TABLE `ks_km_broker` +ADD COLUMN `endpoint_map` VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '监听信息' AFTER `update_time`; + +ALTER TABLE `logi_security_oplog` +ALTER COLUMN `operation_methods` set default ''; + +``` + +--- + + +### 6.2.2、`2.x`版本 升级至 `v3.0.0-beta.0`版本 **升级步骤:** diff --git a/docs/user_guide/faq.md b/docs/user_guide/faq.md index 3062cfa2..60620f73 100644 --- a/docs/user_guide/faq.md +++ b/docs/user_guide/faq.md @@ -109,3 +109,21 @@ SECURITY.TRICK_USERS 设置完成上面两步之后,就可以直接调用需要登录的接口了。 但是还有一点需要注意,绕过的用户仅能调用他有权限的接口,比如一个普通用户,那么他就只能调用普通的接口,不能去调用运维人员的接口。 + +## 8.8、Specified key was too long; max key length is 767 bytes + +**原因:**不同版本的InoDB引擎,参数‘innodb_large_prefix’默认值不同,即在5.6默认值为OFF,5.7默认值为ON。 + +对于引擎为InnoDB,innodb_large_prefix=OFF,且行格式为Antelope即支持REDUNDANT或COMPACT时,索引键前缀长度最大为 767 字节。innodb_large_prefix=ON,且行格式为Barracuda即支持DYNAMIC或COMPRESSED时,索引键前缀长度最大为3072字节。 + +**解决方案:** + +- 减少varchar字符大小低于767/4=191。 +- 将字符集改为latin1(一个字符=一个字节)。 +- 开启‘innodb_large_prefix’,修改默认行格式‘innodb_file_format’为Barracuda,并设置row_format=dynamic。 + +## 8.9、出现ESIndexNotFoundEXception报错 + +**原因 :**没有创建ES索引模版 + +**解决方案:**执行init_es_template.sh脚本,创建ES索引模版即可。 diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java index 55944f6f..a94a377d 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java @@ -5,7 +5,6 @@ import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*; -import com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory; @@ -21,6 +20,8 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; + @Component public class MetricESSender implements ApplicationListener { protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); @@ -41,37 +42,37 @@ public class MetricESSender implements ApplicationListener { public void onApplicationEvent(BaseMetricEvent event) { if(event instanceof BrokerMetricEvent) { BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event; - send2es(KafkaMetricIndexEnum.BROKER_INFO, + send2es(BROKER_INDEX, ConvertUtil.list2List(brokerMetricEvent.getBrokerMetrics(), BrokerMetricPO.class) ); } else if(event instanceof ClusterMetricEvent) { ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event; - send2es(KafkaMetricIndexEnum.CLUSTER_INFO, + send2es(CLUSTER_INDEX, ConvertUtil.list2List(clusterMetricEvent.getClusterMetrics(), ClusterMetricPO.class) ); } else if(event instanceof TopicMetricEvent) { TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event; - send2es(KafkaMetricIndexEnum.TOPIC_INFO, + send2es(TOPIC_INDEX, ConvertUtil.list2List(topicMetricEvent.getTopicMetrics(), TopicMetricPO.class) ); } else if(event instanceof PartitionMetricEvent) { PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event; - send2es(KafkaMetricIndexEnum.PARTITION_INFO, + send2es(PARTITION_INDEX, ConvertUtil.list2List(partitionMetricEvent.getPartitionMetrics(), PartitionMetricPO.class) ); } else if(event instanceof GroupMetricEvent) { GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event; - send2es(KafkaMetricIndexEnum.GROUP_INFO, + send2es(GROUP_INDEX, ConvertUtil.list2List(groupMetricEvent.getGroupMetrics(), GroupMetricPO.class) ); } else if(event instanceof ReplicaMetricEvent) { ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event; - send2es(KafkaMetricIndexEnum.REPLICATION_INFO, + send2es(REPLICATION_INDEX, ConvertUtil.list2List(replicaMetricEvent.getReplicationMetrics(), ReplicationMetricPO.class) ); } @@ -80,19 +81,19 @@ public class MetricESSender implements ApplicationListener { /** * 根据不同监控维度来发送 */ - private boolean send2es(KafkaMetricIndexEnum stats, List statsList){ + private boolean send2es(String index, List statsList){ if (CollectionUtils.isEmpty(statsList)) { return true; } if (!EnvUtil.isOnline()) { LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}", - stats.getIndex(), statsList.size()); + index, statsList.size()); } - BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(stats); + BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index); if (Objects.isNull( baseMetricESDao )) { - LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", stats.getIndex()); + LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index); return false; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java index 916820d8..d7e3b792 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java @@ -1,5 +1,10 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.broker; + +import com.alibaba.fastjson.TypeReference; +import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData; +import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata; import lombok.AllArgsConstructor; import lombok.Data; @@ -7,6 +12,7 @@ import lombok.NoArgsConstructor; import org.apache.kafka.common.Node; import java.io.Serializable; +import java.util.Map; /** * @author didi @@ -55,6 +61,11 @@ public class Broker implements Serializable { */ private Integer status; + /** + * 监听信息 + */ + private Map endpointMap; + public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp) { Broker metadata = new Broker(); metadata.setClusterPhyId(clusterPhyId); @@ -78,9 +89,31 @@ public class Broker implements Serializable { metadata.setStartTimestamp(brokerMetadata.getTimestamp()); metadata.setRack(brokerMetadata.getRack()); metadata.setStatus(1); + metadata.setEndpointMap(brokerMetadata.getEndpointMap()); return metadata; } + public static Broker buildFrom(BrokerPO brokerPO) { + Broker broker = ConvertUtil.obj2Obj(brokerPO, Broker.class); + String endpointMapStr = brokerPO.getEndpointMap(); + if (broker == null || endpointMapStr == null || endpointMapStr.equals("")) { + return broker; + } + + // 填充endpoint信息 + Map endpointMap = ConvertUtil.str2ObjByJson(endpointMapStr, new TypeReference>(){}); + broker.setEndpointMap(endpointMap); + return broker; + } + + public String getJmxHost(String endPoint) { + if (endPoint == null || endpointMap == null) { + return host; + } + IpPortData ip = endpointMap.get(endPoint); + return ip == null ? ip.getIp() : host; + } + public boolean alive() { return status != null && status > 0; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java index 5c78183c..87607c1f 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java @@ -27,6 +27,9 @@ public class JmxConfig implements Serializable { @ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19") private String token; + + @ApiModelProperty(value="使用哪个endpoint网络", example = "EXTERNAL") + private String useWhichEndpoint; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java index f342d64c..02907a6c 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java @@ -1,6 +1,6 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.kafka.clients.admin.OffsetSpec; @@ -10,13 +10,13 @@ import java.util.Map; @Data @NoArgsConstructor -public class PartitionOffsetParam extends ClusterPhyParam { +public class PartitionOffsetParam extends TopicParam { private Map topicPartitionOffsets; private Long timestamp; - public PartitionOffsetParam(Long clusterPhyId, Map topicPartitionOffsets, Long timestamp) { - super(clusterPhyId); + public PartitionOffsetParam(Long clusterPhyId, String topicName, Map topicPartitionOffsets, Long timestamp) { + super(clusterPhyId, topicName); this.topicPartitionOffsets = topicPartitionOffsets; this.timestamp = timestamp; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicParam.java index d6a6b516..5c66a521 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicParam.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicParam.java @@ -15,4 +15,12 @@ public class TopicParam extends ClusterPhyParam { super(clusterPhyId); this.topicName = topicName; } + + @Override + public String toString() { + return "TopicParam{" + + "clusterPhyId=" + clusterPhyId + + ", topicName='" + topicName + '\'' + + '}'; + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java index 2f50480d..16f98d88 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java @@ -42,4 +42,9 @@ public class BrokerPO extends BasePO { * Broker状态 */ private Integer status; + + /** + * 监听信息 + */ + private String endpointMap; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java index 1dc894f7..c647b222 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java @@ -29,6 +29,10 @@ public class MetricPointVO implements Comparable { @Override public int compareTo(MetricPointVO o) { if(null == o){return 0;} + if(null == this.getTimeStamp() + || null == o.getTimeStamp()){ + return 0; + } return this.getTimeStamp().intValue() - o.getTimeStamp().intValue(); } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java new file mode 100644 index 00000000..0de516f7 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java @@ -0,0 +1,647 @@ +package com.xiaojukeji.know.streaming.km.common.constant; + +public class ESIndexConstant { + + public final static String TOPIC_INDEX = "ks_kafka_topic_metric"; + public final static String TOPIC_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_topic_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"brokerId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"topic\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"BytesIn_min_15\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"Messages\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesRejected\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"PartitionURP\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckTotal\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"ReplicationCount\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"ReplicationBytesOut\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"ReplicationBytesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"FailedFetchRequests\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn_min_5\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthScore\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"LogSize\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut_min_15\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"FailedProduceRequests\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut_min_5\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"MessagesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"TotalProduceRequests\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckPassed\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"brokerAgg\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String CLUSTER_INDEX = "ks_kafka_cluster_metric"; + public final static String CLUSTER_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_cluster_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"Connections\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesIn_min_15\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PartitionURP\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore_Topics\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"EventQueueSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"ActiveControllerCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupDeads\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesIn_min_5\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal_Topics\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Partitions\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesOut\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Groups\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesOut_min_15\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"TotalRequestQueueSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed_Groups\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"TotalProduceRequests\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"TotalLogSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupEmptys\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PartitionNoLeader\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore_Brokers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Messages\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Topics\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PartitionMinISR_E\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Brokers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Replicas\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal_Groups\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupRebalances\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MessageIn\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed_Topics\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal_Brokers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PartitionMinISR_S\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesIn\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesOut_min_5\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupActives\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MessagesIn\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupReBalances\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed_Brokers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore_Groups\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"TotalResponseQueueSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Zookeepers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"LeaderMessages\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore_Cluster\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed_Cluster\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal_Cluster\" : {\n" + + " \"type\" : \"double\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"type\" : \"date\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String BROKER_INDEX = "ks_kafka_broker_metric"; + public final static String BROKER_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_broker_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"brokerId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"NetworkProcessorAvgIdle\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"UnderReplicatedPartitions\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn_min_15\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckTotal\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"RequestHandlerAvgIdle\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"connectionsCount\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn_min_5\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthScore\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut_min_15\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut_min_5\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"TotalRequestQueueSize\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"MessagesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"TotalProduceRequests\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckPassed\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"TotalResponseQueueSize\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String PARTITION_INDEX = "ks_kafka_partition_metric"; + public final static String PARTITION_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_partition_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"brokerId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"partitionId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"topic\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"LogStartOffset\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"Messages\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"LogEndOffset\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String GROUP_INDEX = "ks_kafka_group_metric"; + public final static String GROUP_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_group_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"group\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"partitionId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"topic\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"HealthScore\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"Lag\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"OffsetConsumed\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckTotal\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckPassed\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"groupMetric\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String REPLICATION_INDEX = "ks_kafka_replication_metric"; + public final static String REPLICATION_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_partition_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"brokerId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"partitionId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"topic\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"LogStartOffset\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"Messages\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"LogEndOffset\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }[root@10-255-0-23 template]# cat ks_kafka_replication_metric\n" + + "PUT _template/ks_kafka_replication_metric\n" + + "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_replication_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java index c6205152..3b768e01 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java @@ -33,7 +33,7 @@ public class KafkaConstant { public static final Integer DATA_VERSION_ONE = 1; - public static final Integer ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS = 3000; + public static final Integer ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS = 5000; public static final Integer KAFKA_SASL_SCRAM_ITERATIONS = 8192; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/metric/KafkaMetricIndexEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/metric/KafkaMetricIndexEnum.java deleted file mode 100644 index 25535864..00000000 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/metric/KafkaMetricIndexEnum.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.xiaojukeji.know.streaming.km.common.enums.metric; - -/** - * @author: D10865 - * @description: - * @date: Create on 2019/3/11 下午2:19 - * @modified By D10865 - * - * 不同维度的es监控数据 - */ -public enum KafkaMetricIndexEnum { - - /** - * topic 维度 - */ - TOPIC_INFO("ks_kafka_topic_metric"), - - /** - * 集群 维度 - */ - CLUSTER_INFO("ks_kafka_cluster_metric"), - - /** - * broker 维度 - */ - BROKER_INFO("ks_kafka_broker_metric"), - - /** - * partition 维度 - */ - PARTITION_INFO("ks_kafka_partition_metric"), - - /** - * group 维度 - */ - GROUP_INFO("ks_kafka_group_metric"), - - /** - * replication 维度 - */ - REPLICATION_INFO("ks_kafka_replication_metric"), - - ; - - private String index; - - KafkaMetricIndexEnum(String index) { - this.index = index; - } - - public String getIndex() { - return index; - } -} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index e9f8c933..dc702388 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -130,6 +130,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok // 如果当前Broker还存活,则更新DB信息 BrokerPO newBrokerPO = ConvertUtil.obj2Obj(presentAliveBroker, BrokerPO.class); + if (presentAliveBroker.getEndpointMap() != null) { + newBrokerPO.setEndpointMap(ConvertUtil.obj2Json(presentAliveBroker.getEndpointMap())); + } newBrokerPO.setId(inDBBrokerPO.getId()); newBrokerPO.setStatus(Constant.ALIVE); newBrokerPO.setCreateTime(inDBBrokerPO.getCreateTime()); @@ -203,7 +206,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok lambdaQueryWrapper.eq(BrokerPO::getClusterPhyId, clusterPhyId); lambdaQueryWrapper.eq(BrokerPO::getBrokerId, brokerId); - return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class); + return Broker.buildFrom(brokerDAO.selectOne(lambdaQueryWrapper)); } @Override @@ -272,9 +275,8 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok /**************************************************** private method ****************************************************/ private List listAllBrokersAndUpdateCache(Long clusterPhyId) { - List allBrokerList = ConvertUtil.list2List(this.getAllBrokerPOsFromDB(clusterPhyId), Broker.class); + List allBrokerList = getAllBrokerPOsFromDB(clusterPhyId).stream().map(elem -> Broker.buildFrom(elem)).collect(Collectors.toList()); brokersCache.put(clusterPhyId, allBrokerList); - return allBrokerList; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java index da12bf95..b029330c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java @@ -102,7 +102,10 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId); try { - DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups(Arrays.asList(groupName), new DescribeConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).includeAuthorizedOperations(true)); + DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups( + Arrays.asList(groupName), + new DescribeConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).includeAuthorizedOperations(false) + ); return describeConsumerGroupsResult.all().get().get(groupName); } catch(Exception e){ diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java index 92b54c16..13eedb41 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java @@ -207,7 +207,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P .forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec)); try { - return (Result>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicPartitionOffsets, timestamp)); + return (Result>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp)); } catch (VCHandlerNotExistException e) { return Result.buildFailure(VC_HANDLE_NOT_EXIST); } @@ -226,7 +226,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P .forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec)); try { - return (Result>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicPartitionOffsets, timestamp)); + return (Result>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp)); } catch (VCHandlerNotExistException e) { return Result.buildFailure(VC_HANDLE_NOT_EXIST); } @@ -300,7 +300,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P } catch (NotExistException nee) { return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId())); } catch (Exception e) { - log.error("method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||errMsg=exception!", offsetParam.getClusterPhyId(), e); + log.error( + "class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||errMsg=exception!", + offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e + ); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } @@ -355,7 +358,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P } catch (NotExistException nee) { return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId())); } catch (Exception e) { - log.error("method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||errMsg=exception!", offsetParam.getClusterPhyId(), e); + log.error( + "class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||topicName={}||errMsg=exception!", + offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e + ); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } finally { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java index d3357ab4..53b98479 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java @@ -64,11 +64,13 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric { public static final String CLUSTER_METRIC_BYTES_OUT = "BytesOut"; public static final String CLUSTER_METRIC_BYTES_OUT_5_MIN = "BytesOut_min_5"; public static final String CLUSTER_METRIC_BYTES_OUT_15_MIN = "BytesOut_min_15"; + public static final String CLUSTER_METRIC_GROUP = "Groups"; public static final String CLUSTER_METRIC_GROUP_ACTIVES = "GroupActives"; public static final String CLUSTER_METRIC_GROUP_EMPTYS = "GroupEmptys"; public static final String CLUSTER_METRIC_GROUP_REBALANCES = "GroupRebalances"; public static final String CLUSTER_METRIC_GROUP_DEADS = "GroupDeads"; + public static final String CLUSTER_METRIC_ALIVE = "Alive"; public static final String CLUSTER_METRIC_ACL_ENABLE = "AclEnable"; diff --git a/km-dist/init/sql/ddl-ks-km.sql b/km-dist/init/sql/ddl-ks-km.sql index 96de4ed1..50696917 100644 --- a/km-dist/init/sql/ddl-ks-km.sql +++ b/km-dist/init/sql/ddl-ks-km.sql @@ -13,6 +13,7 @@ CREATE TABLE `ks_km_broker` ( `status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `endpoint_map` varchar(1024) NOT NULL DEFAULT '' COMMENT '监听信息', PRIMARY KEY (`id`), UNIQUE KEY `uniq_cluster_phy_id_broker_id` (`cluster_phy_id`,`broker_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Broker信息表'; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java index dff96236..62bc6a57 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java @@ -8,7 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired; /** * 直接操作es集群的dao */ -public class BaseESDAO { +public abstract class BaseESDAO { protected static final ILog LOGGER = LogFactory.getLog("ES_LOGGER"); /** diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index c611c538..1200699a 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -11,7 +11,11 @@ import com.didiglobal.logi.elasticsearch.client.request.batch.ESBatchRequest; import com.didiglobal.logi.elasticsearch.client.request.query.query.ESQueryRequest; import com.didiglobal.logi.elasticsearch.client.response.batch.ESBatchResponse; import com.didiglobal.logi.elasticsearch.client.response.batch.IndexResultItemNode; +import com.didiglobal.logi.elasticsearch.client.response.indices.gettemplate.ESIndicesGetTemplateResponse; +import com.didiglobal.logi.elasticsearch.client.response.indices.putindex.ESIndicesPutIndexResponse; +import com.didiglobal.logi.elasticsearch.client.response.indices.puttemplate.ESIndicesPutTemplateResponse; import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse; +import com.didiglobal.logi.elasticsearch.client.response.setting.template.TemplateConfig; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.google.common.collect.Lists; @@ -340,7 +344,94 @@ public class ESOpClient { return false; } + /** + * 根据表达式判断索引是否已存在 + */ + public boolean indexExist(String indexName) { + ESClient esClient = null; + try { + esClient = this.getESClientFromPool(); + if (esClient == null) { + return false; + } + + // 检查索引是否存在 + return esClient.admin().indices().prepareExists(indexName).execute().actionGet(30, TimeUnit.SECONDS).isExists(); + } catch (Exception e){ + LOGGER.warn("class=ESOpClient||method=indexExist||indexName={}||msg=exception!", indexName, e); + } finally { + if (esClient != null) { + returnESClientToPool(esClient); + } + } + + return false; + } + + /** + * 创建索引 + */ + public boolean createIndex(String indexName) { + if (indexExist(indexName)) { + return true; + } + + ESClient client = getESClientFromPool(); + if (client != null) { + try { + ESIndicesPutIndexResponse response = client.admin().indices().preparePutIndex(indexName).execute() + .actionGet(30, TimeUnit.SECONDS); + return response.getAcknowledged(); + } catch (Exception e){ + LOGGER.warn( "msg=create index fail||indexName={}", indexName, e); + } finally { + returnESClientToPool(client); + } + } + + return false; + } + + /** + * 创建索引模板 + */ + public boolean createIndexTemplateIfNotExist(String indexTemplateName, String config) { + ESClient esClient = null; + + try { + esClient = this.getESClientFromPool(); + + // 获取es中原来index template的配置 + ESIndicesGetTemplateResponse getTemplateResponse = + esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( 30, TimeUnit.SECONDS ); + + TemplateConfig templateConfig = getTemplateResponse.getMultiTemplatesConfig().getSingleConfig(); + + if (null != templateConfig) { + return true; + } + + // 创建新的模板 + ESIndicesPutTemplateResponse response = esClient.admin().indices().preparePutTemplate( indexTemplateName ) + .setTemplateConfig( config ).execute().actionGet( 30, TimeUnit.SECONDS ); + + return response.getAcknowledged(); + } catch (Exception e) { + LOGGER.warn( + "class=ESOpClient||method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!", + indexTemplateName, config, e + ); + } finally { + if (esClient != null) { + this.returnESClientToPool(esClient); + } + } + + return false; + } + /**************************************************** private method ****************************************************/ + /** * 执行查询 * @param request diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java index 8a0f96a9..fe04e4d1 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java @@ -8,11 +8,12 @@ import com.google.common.collect.Maps; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.*; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO; -import com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils; import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import lombok.NoArgsConstructor; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.util.CollectionUtils; import java.util.*; @@ -25,7 +26,8 @@ public class BaseMetricESDAO extends BaseESDAO { /** * 操作的索引名称 */ - protected String indexName; + protected String indexName; + protected String indexTemplate; protected static final Long ONE_MIN = 60 * 1000L; protected static final Long FIVE_MIN = 5 * ONE_MIN; @@ -35,10 +37,24 @@ public class BaseMetricESDAO extends BaseESDAO { /** * 不同维度 kafka 监控数据 */ - private static Map ariusStatsEsDaoMap = Maps + private static Map ariusStatsEsDaoMap = Maps .newConcurrentMap(); - public static BaseMetricESDAO getByStatsType(KafkaMetricIndexEnum statsType) { + /** + * 检查 es 索引是否存在,不存在则创建索引 + */ + @Scheduled(cron = "0 3/5 * * * ?") + public void checkCurrentDayIndexExist(){ + String realIndex = IndexNameUtils.genCurrentDailyIndexName(indexName); + + if(esOpClient.indexExist(realIndex)){return;} + + if(esOpClient.createIndexTemplateIfNotExist(indexName, indexTemplate)){ + esOpClient.createIndex(realIndex); + } + } + + public static BaseMetricESDAO getByStatsType(String statsType) { return ariusStatsEsDaoMap.get(statsType); } @@ -48,7 +64,7 @@ public class BaseMetricESDAO extends BaseESDAO { * @param statsType * @param baseAriusStatsEsDao */ - public static void register(KafkaMetricIndexEnum statsType, BaseMetricESDAO baseAriusStatsEsDao) { + public static void register(String statsType, BaseMetricESDAO baseAriusStatsEsDao) { ariusStatsEsDaoMap.put(statsType, baseAriusStatsEsDao); } @@ -358,7 +374,50 @@ public class BaseMetricESDAO extends BaseESDAO { String dsl = dslLoaderUtil.getFormatDslByFileName(DslsConstant.GET_LATEST_METRIC_TIME, startTime, endTime, appendQueryDsl); String realIndexName = IndexNameUtils.genDailyIndexName(indexName, startTime, endTime); - return esOpClient.performRequest(realIndexName, dsl, s -> s.getHits().getHits().isEmpty() - ? System.currentTimeMillis() : ((Map)s.getHits().getHits().get(0).getSource()).get(TIME_STAMP), 3); + return esOpClient.performRequest( + realIndexName, + dsl, + s -> s == null || s.getHits().getHits().isEmpty() ? System.currentTimeMillis() : ((Map)s.getHits().getHits().get(0).getSource()).get(TIME_STAMP), + 3 + ); + } + + /** + * 对 metricPointVOS 进行缺点优化 + */ + protected List optimizeMetricPoints(List metricPointVOS){ + if(CollectionUtils.isEmpty(metricPointVOS)){return metricPointVOS;} + + int size = metricPointVOS.size(); + if(size < 2){return metricPointVOS;} + + Collections.sort(metricPointVOS); + + List rets = new ArrayList<>(); + for(int first = 0, second = first + 1; second < size; first++, second++){ + MetricPointVO firstPoint = metricPointVOS.get(first); + MetricPointVO secondPoint = metricPointVOS.get(second); + + if(null != firstPoint && null != secondPoint){ + rets.add(firstPoint); + + //说明有空点,那就增加一个点 + if(secondPoint.getTimeStamp() - firstPoint.getTimeStamp() > ONE_MIN){ + MetricPointVO addPoint = new MetricPointVO(); + addPoint.setName(firstPoint.getName()); + addPoint.setAggType(firstPoint.getAggType()); + addPoint.setValue(firstPoint.getValue()); + addPoint.setTimeStamp(firstPoint.getTimeStamp() + ONE_MIN); + + rets.add(addPoint); + } + + if(second == size - 1){ + rets.add(secondPoint); + } + } + } + + return rets; } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java index b80c1ca0..edc186f4 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java @@ -18,14 +18,16 @@ import java.util.*; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.BROKER_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; @Component public class BrokerMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = BROKER_INFO.getIndex(); - BaseMetricESDAO.register(BROKER_INFO, this); + super.indexName = BROKER_INDEX; + super.indexTemplate = BROKER_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } protected FutureWaitUtil queryFuture = FutureWaitUtil.init("BrokerMetricESDAO", 4,8, 500); @@ -258,7 +260,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { } } ); - metricMap.put(metric, metricPoints); + metricMap.put(metric, optimizeMetricPoints(metricPoints)); } return metricMap; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java index 63a9f3f1..82a86253 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java @@ -23,15 +23,17 @@ import java.util.List; import java.util.Map; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.CLUSTER_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; @Component public class ClusterMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = CLUSTER_INFO.getIndex(); - BaseMetricESDAO.register(CLUSTER_INFO, this); + super.indexName = CLUSTER_INDEX; + super.indexTemplate = CLUSTER_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } protected FutureWaitUtil queryFuture = FutureWaitUtil.init("ClusterMetricESDAO", 4,8, 500); @@ -207,7 +209,7 @@ public class ClusterMetricESDAO extends BaseMetricESDAO { } } ); - metricMap.put(metric, metricPoints); + metricMap.put(metric, optimizeMetricPoints(metricPoints)); } return metricMap; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java index 42ae0ace..cf65e6ef 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java @@ -23,16 +23,17 @@ import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.constant.Constant.ZERO; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; -import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.KEY; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.GROUP_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; @Component public class GroupMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = GROUP_INFO.getIndex(); - BaseMetricESDAO.register(GROUP_INFO, this); + super.indexName = GROUP_INDEX; + super.indexTemplate = GROUP_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } protected FutureWaitUtil queryFuture = FutureWaitUtil.init("GroupMetricESDAO", 4,8, 500); @@ -206,7 +207,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO { } } ); - metricMap.put(metric, metricPoints); + metricMap.put(metric, optimizeMetricPoints(metricPoints)); } return metricMap; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java index 85dc55df..4f86852b 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java @@ -8,7 +8,7 @@ import javax.annotation.PostConstruct; import java.util.List; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.PARTITION_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; /** * @author didi @@ -18,8 +18,10 @@ public class PartitionMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = PARTITION_INFO.getIndex(); - BaseMetricESDAO.register(PARTITION_INFO, this); + super.indexName = PARTITION_INDEX; + super.indexTemplate = PARTITION_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } public PartitionMetricPO getPartitionLatestMetrics(Long clusterPhyId, String topic, diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java index e5f9f164..1f604cc0 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java @@ -14,7 +14,7 @@ import java.util.List; import java.util.Map; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.VALUE; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.REPLICATION_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; /** * @author didi @@ -24,8 +24,10 @@ public class ReplicationMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = REPLICATION_INFO.getIndex(); - BaseMetricESDAO.register(REPLICATION_INFO, this); + super.indexName = REPLICATION_INDEX; + super.indexTemplate = REPLICATION_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } /** diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java index 402333ee..e9089c17 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java @@ -22,15 +22,17 @@ import java.util.*; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.TOPIC_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; @Component public class TopicMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = TOPIC_INFO.getIndex(); - BaseMetricESDAO.register(TOPIC_INFO, this); + super.indexName = TOPIC_INDEX; + super.indexTemplate = TOPIC_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } protected FutureWaitUtil queryFuture = FutureWaitUtil.init("TopicMetricESDAO", 4,8, 500); @@ -352,7 +354,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO { } } ); - metricMap.put(metric, metricPoints); + metricMap.put(metric, optimizeMetricPoints(metricPoints)); } return metricMap; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java index afa904af..68d1011e 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java @@ -165,8 +165,8 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler { clusterPhy.getId(), brokerId, broker.getStartTimestamp(), - broker.getHost(), - broker.getJmxPort() != null? broker.getJmxPort(): jmxConfig.getJmxPort(), + jmxConfig != null ? broker.getJmxHost(jmxConfig.getUseWhichEndpoint()) : broker.getHost(), + broker.getJmxPort() != null ? broker.getJmxPort() : jmxConfig.getJmxPort(), jmxConfig ); @@ -191,6 +191,6 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler { lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE); BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper); - return ConvertUtil.obj2Obj(brokerPO, Broker.class); + return Broker.buildFrom(brokerPO); } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java index c05a66ad..5169bbad 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java @@ -6,5 +6,4 @@ import org.springframework.stereotype.Repository; @Repository public interface BrokerDAO extends BaseMapper { - int replace(BrokerPO brokerPO); } diff --git a/km-persistence/src/main/resources/mybatis/BrokerMapper.xml b/km-persistence/src/main/resources/mybatis/BrokerMapper.xml index 360fe9c8..3d9f5d8f 100644 --- a/km-persistence/src/main/resources/mybatis/BrokerMapper.xml +++ b/km-persistence/src/main/resources/mybatis/BrokerMapper.xml @@ -14,12 +14,7 @@ + - - REPLACE ks_km_broker - (cluster_phy_id, broker_id, host, port, jmx_port, start_timestamp, status, update_time) - VALUES - (#{clusterPhyId}, #{brokerId}, #{host}, #{port}, #{jmxPort}, #{startTimestamp}, #{status}, #{updateTime}) - diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java index 2cdb895e..c69f7129 100644 --- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java +++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java @@ -20,8 +20,8 @@ public class ClusterMetricESDAOTest extends KnowStreamApplicationTest { @Test public void listClusterMetricsByClusterIdsTest(){ - List metrics = Arrays.asList("BytesIn_min_1", "BytesOut_min_1"); - List clusterIds = Arrays.asList(123L); + List metrics = Arrays.asList("MessagesIn"); + List clusterIds = Arrays.asList(293L); Long endTime = System.currentTimeMillis(); Long startTime = endTime - 4 * 60 * 60 * 1000; diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java index 0d4c8db9..3e661418 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java @@ -16,6 +16,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; +import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; @@ -24,11 +25,18 @@ import java.util.*; @NoArgsConstructor @AllArgsConstructor -@Task(name = "HealthCheckTask", description = "健康检查", cron = "0 0/1 * * * ? *", - autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) +@Task(name = "HealthCheckTask", + description = "健康检查", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) public class HealthCheckTask extends AbstractClusterPhyDispatchTask { private static final ILog log = LogFactory.getLog(HealthCheckTask.class); + @Autowired + private TaskThreadPoolService taskThreadPoolService; + @Autowired private HealthCheckResultService healthCheckResultService; @@ -38,6 +46,16 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask { @Override public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + taskThreadPoolService.submitHeavenTask( + String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()), + 100000, + () -> this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs) + ); + + return TaskResult.SUCCESS; + } + + private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { // 获取配置,<配置名,配置信息> Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); @@ -73,8 +91,6 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask { } catch (Exception e) { log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); } - - return TaskResult.SUCCESS; } private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java index 5e7d222a..884a572d 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/TaskThreadPoolService.java @@ -10,7 +10,7 @@ import javax.annotation.PostConstruct; /** * 为了尽量避免大任务的执行,由LogIJob的线程执行, * 因此,在Task模块,需要有自己的线程池来执行相关任务, - * 而 FutureUtilsService 的职责就是负责任务的执行。 + * 而 TaskThreadPoolService 的职责就是负责任务的执行。 */ @Service @NoArgsConstructor