mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-07 15:12:14 +08:00
Merge pull request #531 from didi/dev_v3.0.0
1、后端补充leader选举能力;2、图片链接调整;3、健康检查文案调整;4、版本列表增加排序;5、指标采集缓存时间调整;
This commit is contained in:
@@ -19,7 +19,7 @@
|
|||||||
|
|
||||||
未开启时,直接到`2、解决方法`查看如何开启即可。
|
未开启时,直接到`2、解决方法`查看如何开启即可。
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
|
|
||||||
**类型二:配置错误**
|
**类型二:配置错误**
|
||||||
|
|||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition;
|
||||||
|
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class BatchPartitionParam extends ClusterPhyParam {
|
||||||
|
private List<TopicPartition> tpList;
|
||||||
|
|
||||||
|
public BatchPartitionParam(Long clusterPhyId, List<TopicPartition> tpList) {
|
||||||
|
super(clusterPhyId);
|
||||||
|
this.tpList = tpList;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -26,7 +26,7 @@ public enum HealthCheckNameEnum {
|
|||||||
HealthCheckDimensionEnum.CLUSTER,
|
HealthCheckDimensionEnum.CLUSTER,
|
||||||
"Controller",
|
"Controller",
|
||||||
Constant.HC_CONFIG_NAME_PREFIX + "CLUSTER_NO_CONTROLLER",
|
Constant.HC_CONFIG_NAME_PREFIX + "CLUSTER_NO_CONTROLLER",
|
||||||
"集群Controller数错误",
|
"集群Controller数正常",
|
||||||
HealthCompareValueConfig.class
|
HealthCompareValueConfig.class
|
||||||
),
|
),
|
||||||
|
|
||||||
@@ -34,7 +34,7 @@ public enum HealthCheckNameEnum {
|
|||||||
HealthCheckDimensionEnum.BROKER,
|
HealthCheckDimensionEnum.BROKER,
|
||||||
"RequestQueueSize",
|
"RequestQueueSize",
|
||||||
Constant.HC_CONFIG_NAME_PREFIX + "BROKER_REQUEST_QUEUE_FULL",
|
Constant.HC_CONFIG_NAME_PREFIX + "BROKER_REQUEST_QUEUE_FULL",
|
||||||
"Broker-RequestQueueSize被打满",
|
"Broker-RequestQueueSize指标",
|
||||||
HealthCompareValueConfig.class
|
HealthCompareValueConfig.class
|
||||||
),
|
),
|
||||||
|
|
||||||
@@ -42,7 +42,7 @@ public enum HealthCheckNameEnum {
|
|||||||
HealthCheckDimensionEnum.BROKER,
|
HealthCheckDimensionEnum.BROKER,
|
||||||
"NetworkProcessorAvgIdlePercent",
|
"NetworkProcessorAvgIdlePercent",
|
||||||
Constant.HC_CONFIG_NAME_PREFIX + "BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW",
|
Constant.HC_CONFIG_NAME_PREFIX + "BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW",
|
||||||
"Broker-NetworkProcessorAvgIdlePercent的Idle过低",
|
"Broker-NetworkProcessorAvgIdlePercent指标",
|
||||||
HealthCompareValueConfig.class
|
HealthCompareValueConfig.class
|
||||||
),
|
),
|
||||||
|
|
||||||
@@ -50,7 +50,7 @@ public enum HealthCheckNameEnum {
|
|||||||
HealthCheckDimensionEnum.GROUP,
|
HealthCheckDimensionEnum.GROUP,
|
||||||
"Group Re-Balance",
|
"Group Re-Balance",
|
||||||
Constant.HC_CONFIG_NAME_PREFIX + "GROUP_RE_BALANCE_TOO_FREQUENTLY",
|
Constant.HC_CONFIG_NAME_PREFIX + "GROUP_RE_BALANCE_TOO_FREQUENTLY",
|
||||||
"Group re-balance太频繁",
|
"Group re-balance频率",
|
||||||
HealthDetectedInLatestMinutesConfig.class
|
HealthDetectedInLatestMinutesConfig.class
|
||||||
),
|
),
|
||||||
|
|
||||||
@@ -66,7 +66,7 @@ public enum HealthCheckNameEnum {
|
|||||||
HealthCheckDimensionEnum.TOPIC,
|
HealthCheckDimensionEnum.TOPIC,
|
||||||
"UnderReplicaTooLong",
|
"UnderReplicaTooLong",
|
||||||
Constant.HC_CONFIG_NAME_PREFIX + "TOPIC_UNDER_REPLICA_TOO_LONG",
|
Constant.HC_CONFIG_NAME_PREFIX + "TOPIC_UNDER_REPLICA_TOO_LONG",
|
||||||
"Topic 长期处于未同步状态",
|
"Topic 未同步持续时间",
|
||||||
HealthDetectedInLatestMinutesConfig.class
|
HealthDetectedInLatestMinutesConfig.class
|
||||||
),
|
),
|
||||||
|
|
||||||
|
|||||||
@@ -31,9 +31,11 @@ public enum VersionItemTypeEnum {
|
|||||||
|
|
||||||
|
|
||||||
SERVICE_OP_PARTITION(320, "service_partition_operation"),
|
SERVICE_OP_PARTITION(320, "service_partition_operation"),
|
||||||
|
SERVICE_OP_PARTITION_LEADER(321, "service_partition-leader_operation"),
|
||||||
|
|
||||||
SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"),
|
SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"),
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 前端操作
|
* 前端操作
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -10,13 +10,13 @@ import java.util.concurrent.TimeUnit;
|
|||||||
|
|
||||||
public class CollectedMetricsLocalCache {
|
public class CollectedMetricsLocalCache {
|
||||||
private static final Cache<String, Float> brokerMetricsCache = Caffeine.newBuilder()
|
private static final Cache<String, Float> brokerMetricsCache = Caffeine.newBuilder()
|
||||||
.expireAfterWrite(60, TimeUnit.SECONDS)
|
.expireAfterWrite(90, TimeUnit.SECONDS)
|
||||||
.maximumSize(2000)
|
.maximumSize(10000)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private static final Cache<String, List<TopicMetrics>> topicMetricsCache = Caffeine.newBuilder()
|
private static final Cache<String, List<TopicMetrics>> topicMetricsCache = Caffeine.newBuilder()
|
||||||
.expireAfterWrite(90, TimeUnit.SECONDS)
|
.expireAfterWrite(90, TimeUnit.SECONDS)
|
||||||
.maximumSize(5000)
|
.maximumSize(10000)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private static final Cache<String, List<PartitionMetrics>> partitionMetricsCache = Caffeine.newBuilder()
|
private static final Cache<String, List<PartitionMetrics>> partitionMetricsCache = Caffeine.newBuilder()
|
||||||
@@ -29,63 +29,64 @@ public class CollectedMetricsLocalCache {
|
|||||||
.maximumSize(20000)
|
.maximumSize(20000)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static Float getBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName) {
|
public static Float getBrokerMetrics(String brokerMetricKey) {
|
||||||
return brokerMetricsCache.getIfPresent(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName));
|
return brokerMetricsCache.getIfPresent(brokerMetricKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void putBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName, Float value) {
|
public static void putBrokerMetrics(String brokerMetricKey, Float value) {
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
brokerMetricsCache.put(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value);
|
|
||||||
|
brokerMetricsCache.put(brokerMetricKey, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<TopicMetrics> getTopicMetrics(Long clusterPhyId, String topicName, String metricName) {
|
public static List<TopicMetrics> getTopicMetrics(String topicMetricKey) {
|
||||||
return topicMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
|
return topicMetricsCache.getIfPresent(topicMetricKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void putTopicMetrics(Long clusterPhyId, String topicName, String metricName, List<TopicMetrics> metricsList) {
|
public static void putTopicMetrics(String topicMetricKey, List<TopicMetrics> metricsList) {
|
||||||
if (metricsList == null) {
|
if (metricsList == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
topicMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
|
|
||||||
|
topicMetricsCache.put(topicMetricKey, metricsList);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<PartitionMetrics> getPartitionMetricsList(Long clusterPhyId, String topicName, String metricName) {
|
public static List<PartitionMetrics> getPartitionMetricsList(String partitionMetricKey) {
|
||||||
return partitionMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
|
return partitionMetricsCache.getIfPresent(partitionMetricKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void putPartitionMetricsList(Long clusterPhyId, String topicName, String metricName, List<PartitionMetrics> metricsList) {
|
public static void putPartitionMetricsList(String partitionMetricsKey, List<PartitionMetrics> metricsList) {
|
||||||
if (metricsList == null) {
|
if (metricsList == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
partitionMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
|
partitionMetricsCache.put(partitionMetricsKey, metricsList);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Float getReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
|
public static Float getReplicaMetrics(String replicaMetricsKey) {
|
||||||
return replicaMetricsValueCache.getIfPresent(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName));
|
return replicaMetricsValueCache.getIfPresent(replicaMetricsKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void putReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName, Float value) {
|
public static void putReplicaMetrics(String replicaMetricsKey, Float value) {
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
replicaMetricsValueCache.put(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value);
|
replicaMetricsValueCache.put(replicaMetricsKey, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
|
||||||
/**************************************************** private method ****************************************************/
|
|
||||||
|
|
||||||
|
|
||||||
private static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
|
|
||||||
return clusterPhyId + "@" + brokerId + "@" + metricName;
|
return clusterPhyId + "@" + brokerId + "@" + metricName;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
|
public static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
|
||||||
return clusterPhyId + "@" + topicName + "@" + metricName;
|
return clusterPhyId + "@" + topicName + "@" + metricName;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
|
public static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
|
||||||
return clusterPhyId + "@" + brokerId + "@" + topicName + "@" + partitionId + "@" + metricName;
|
return clusterPhyId + "@" + brokerId + "@" + topicName + "@" + partitionId + "@" + metricName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**************************************************** private method ****************************************************/
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -110,9 +110,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric){
|
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric) {
|
||||||
|
String brokerMetricKey = CollectedMetricsLocalCache.genBrokerMetricKey(clusterId, brokerId, metric);
|
||||||
|
|
||||||
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric);
|
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(brokerMetricKey);
|
||||||
if(null != keyValue) {
|
if(null != keyValue) {
|
||||||
BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId);
|
BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId);
|
||||||
brokerMetrics.putMetric(metric, keyValue);
|
brokerMetrics.putMetric(metric, keyValue);
|
||||||
@@ -124,7 +125,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
|
|||||||
|
|
||||||
Map<String, Float> metricsMap = ret.getData().getMetrics();
|
Map<String, Float> metricsMap = ret.getData().getMetrics();
|
||||||
for(Map.Entry<String, Float> metricNameAndValueEntry : metricsMap.entrySet()){
|
for(Map.Entry<String, Float> metricNameAndValueEntry : metricsMap.entrySet()){
|
||||||
CollectedMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue());
|
CollectedMetricsLocalCache.putBrokerMetrics(brokerMetricKey, metricNameAndValueEntry.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@@ -73,5 +73,5 @@ public interface ClusterPhyService {
|
|||||||
* 获取系统已存在的kafka版本列表
|
* 获取系统已存在的kafka版本列表
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
Set<String> getClusterVersionSet();
|
List<String> getClusterVersionList();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -728,13 +728,10 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
|
|||||||
Long clusterId = param.getClusterId();
|
Long clusterId = param.getClusterId();
|
||||||
|
|
||||||
//1、获取jmx的属性信息
|
//1、获取jmx的属性信息
|
||||||
VersionJmxInfo jmxInfo = getJMXInfo(clusterId, metric);
|
|
||||||
if(null == jmxInfo){return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);}
|
|
||||||
|
|
||||||
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
|
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
|
||||||
|
|
||||||
float metricVale = 0f;
|
float metricVale = 0f;
|
||||||
for(Broker broker : brokers){
|
for(Broker broker : brokers) {
|
||||||
Result<BrokerMetrics> ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterId, broker.getBrokerId(), metric);
|
Result<BrokerMetrics> ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterId, broker.getBrokerId(), metric);
|
||||||
|
|
||||||
if(null == ret || ret.failed() || null == ret.getData()){continue;}
|
if(null == ret || ret.failed() || null == ret.getData()){continue;}
|
||||||
|
|||||||
@@ -24,8 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.dao.DuplicateKeyException;
|
import org.springframework.dao.DuplicateKeyException;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -205,9 +206,12 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getClusterVersionSet() {
|
public List<String> getClusterVersionList() {
|
||||||
List<ClusterPhy> clusterPhyList = listAllClusters();
|
List<ClusterPhy> clusterPhyList = this.listAllClusters();
|
||||||
Set<String> versionSet = clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet());
|
|
||||||
return versionSet;
|
List<String> versionList = new ArrayList<>(clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet()));
|
||||||
|
Collections.sort(versionList);
|
||||||
|
|
||||||
|
return versionList;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,14 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.core.service.partition;
|
||||||
|
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public interface OpPartitionService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 优先副本选举
|
||||||
|
*/
|
||||||
|
Result<Void> preferredReplicaElection(Long clusterPhyId, List<TopicPartition> tpList);
|
||||||
|
}
|
||||||
@@ -0,0 +1,119 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.core.service.partition.impl;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.BatchPartitionParam;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
|
||||||
|
import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService;
|
||||||
|
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
|
||||||
|
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
|
||||||
|
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
|
||||||
|
import kafka.zk.KafkaZkClient;
|
||||||
|
import org.apache.kafka.clients.admin.AdminClient;
|
||||||
|
import org.apache.kafka.clients.admin.ElectLeadersOptions;
|
||||||
|
import org.apache.kafka.clients.admin.ElectLeadersResult;
|
||||||
|
import org.apache.kafka.common.ElectionType;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import scala.jdk.javaapi.CollectionConverters;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST;
|
||||||
|
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
|
||||||
|
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_PARTITION_LEADER;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author didi
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class OpPartitionServiceImpl extends BaseVersionControlService implements OpPartitionService {
|
||||||
|
private static final ILog LOGGER = LogFactory.getLog(OpPartitionServiceImpl.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaAdminClient kafkaAdminClient;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaAdminZKClient kafkaAdminZKClient;
|
||||||
|
|
||||||
|
public static final String PREFERRED_REPLICA_ELECTION = "PreferredReplicaElection";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected VersionItemTypeEnum getVersionItemType() {
|
||||||
|
return SERVICE_OP_PARTITION_LEADER;
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
private void init() {
|
||||||
|
registerVCHandler(PREFERRED_REPLICA_ELECTION, V_0_10_0_0, V_2_8_0, "preferredReplicaElectionByZKClient", this::preferredReplicaElectionByZKClient);
|
||||||
|
registerVCHandler(PREFERRED_REPLICA_ELECTION, V_2_8_0, V_MAX, "preferredReplicaElectionByKafkaClient", this::preferredReplicaElectionByKafkaClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result<Void> preferredReplicaElection(Long clusterPhyId, List<TopicPartition> tpList) {
|
||||||
|
try {
|
||||||
|
return (Result<Void>) doVCHandler(
|
||||||
|
clusterPhyId,
|
||||||
|
PREFERRED_REPLICA_ELECTION,
|
||||||
|
new BatchPartitionParam(clusterPhyId, tpList)
|
||||||
|
);
|
||||||
|
} catch (VCHandlerNotExistException e) {
|
||||||
|
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**************************************************** private method ****************************************************/
|
||||||
|
|
||||||
|
private Result<Void> preferredReplicaElectionByZKClient(VersionItemParam itemParam) {
|
||||||
|
BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam;
|
||||||
|
|
||||||
|
try {
|
||||||
|
KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(partitionParam.getClusterPhyId());
|
||||||
|
|
||||||
|
kafkaZkClient.createPreferredReplicaElection(CollectionConverters.asScala(partitionParam.getTpList()).toSet());
|
||||||
|
|
||||||
|
return Result.buildSuc();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"class=OpPartitionServiceImpl||method=preferredReplicaElectionByZKClient||clusterPhyId={}||errMsg=exception",
|
||||||
|
partitionParam.getClusterPhyId(), e
|
||||||
|
);
|
||||||
|
|
||||||
|
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Result<Void> preferredReplicaElectionByKafkaClient(VersionItemParam itemParam) {
|
||||||
|
BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam;
|
||||||
|
|
||||||
|
try {
|
||||||
|
AdminClient adminClient = kafkaAdminClient.getClient(partitionParam.getClusterPhyId());
|
||||||
|
|
||||||
|
ElectLeadersResult electLeadersResult = adminClient.electLeaders(
|
||||||
|
ElectionType.PREFERRED,
|
||||||
|
new HashSet<>(partitionParam.getTpList()),
|
||||||
|
new ElectLeadersOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
|
||||||
|
);
|
||||||
|
|
||||||
|
electLeadersResult.all().get();
|
||||||
|
|
||||||
|
return Result.buildSuc();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"class=OpPartitionServiceImpl||method=preferredReplicaElectionByKafkaClient||clusterPhyId={}||errMsg=exception",
|
||||||
|
partitionParam.getClusterPhyId(), e
|
||||||
|
);
|
||||||
|
|
||||||
|
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -75,7 +75,9 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName) {
|
public Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName) {
|
||||||
List<PartitionMetrics> metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(clusterPhyId, topicName, metricName);
|
String partitionMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName);
|
||||||
|
|
||||||
|
List<PartitionMetrics> metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(partitionMetricsKey);
|
||||||
if(null != metricsList) {
|
if(null != metricsList) {
|
||||||
return Result.buildSuc(metricsList);
|
return Result.buildSuc(metricsList);
|
||||||
}
|
}
|
||||||
@@ -88,12 +90,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
|
|||||||
// 更新cache
|
// 更新cache
|
||||||
PartitionMetrics metrics = metricsResult.getData().get(0);
|
PartitionMetrics metrics = metricsResult.getData().get(0);
|
||||||
metrics.getMetrics().entrySet().forEach(
|
metrics.getMetrics().entrySet().forEach(
|
||||||
metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(
|
metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(partitionMetricsKey, metricsResult.getData())
|
||||||
clusterPhyId,
|
|
||||||
metrics.getTopic(),
|
|
||||||
metricEntry.getKey(),
|
|
||||||
metricsResult.getData()
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
return metricsResult;
|
return metricsResult;
|
||||||
|
|||||||
@@ -77,9 +77,14 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic,
|
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId,
|
||||||
Integer brokerId, Integer partitionId, String metric){
|
String topic,
|
||||||
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(clusterPhyId, brokerId, topic, partitionId, metric);
|
Integer brokerId,
|
||||||
|
Integer partitionId,
|
||||||
|
String metric) {
|
||||||
|
String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric);
|
||||||
|
|
||||||
|
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey);
|
||||||
if(null != keyValue){
|
if(null != keyValue){
|
||||||
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId);
|
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId);
|
||||||
replicationMetrics.putMetric(metric, keyValue);
|
replicationMetrics.putMetric(metric, keyValue);
|
||||||
@@ -92,11 +97,7 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
|
|||||||
// 更新cache
|
// 更新cache
|
||||||
ret.getData().getMetrics().entrySet().stream().forEach(
|
ret.getData().getMetrics().entrySet().stream().forEach(
|
||||||
metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics(
|
metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics(
|
||||||
clusterPhyId,
|
replicaMetricsKey,
|
||||||
brokerId,
|
|
||||||
topic,
|
|
||||||
partitionId,
|
|
||||||
metricNameAndValueEntry.getKey(),
|
|
||||||
metricNameAndValueEntry.getValue()
|
metricNameAndValueEntry.getValue()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -120,7 +120,9 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<List<TopicMetrics>> collectTopicMetricsFromKafkaWithCacheFirst(Long clusterPhyId, String topicName, String metricName) {
|
public Result<List<TopicMetrics>> collectTopicMetricsFromKafkaWithCacheFirst(Long clusterPhyId, String topicName, String metricName) {
|
||||||
List<TopicMetrics> metricsList = CollectedMetricsLocalCache.getTopicMetrics(clusterPhyId, topicName, metricName);
|
String topicMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName);
|
||||||
|
|
||||||
|
List<TopicMetrics> metricsList = CollectedMetricsLocalCache.getTopicMetrics(topicMetricsKey);
|
||||||
if(null != metricsList) {
|
if(null != metricsList) {
|
||||||
return Result.buildSuc(metricsList);
|
return Result.buildSuc(metricsList);
|
||||||
}
|
}
|
||||||
@@ -133,12 +135,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
|
|||||||
// 更新cache
|
// 更新cache
|
||||||
TopicMetrics metrics = metricsResult.getData().get(0);
|
TopicMetrics metrics = metricsResult.getData().get(0);
|
||||||
metrics.getMetrics().entrySet().forEach(
|
metrics.getMetrics().entrySet().forEach(
|
||||||
metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(
|
metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(topicMetricsKey, metricsResult.getData())
|
||||||
clusterPhyId,
|
|
||||||
metrics.getTopic(),
|
|
||||||
metricEntry.getKey(),
|
|
||||||
metricsResult.getData()
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
return metricsResult;
|
return metricsResult;
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
|
|||||||
public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster";
|
public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster";
|
||||||
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster";
|
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster";
|
||||||
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster";
|
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster";
|
||||||
|
|
||||||
public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize";
|
public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize";
|
||||||
public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize";
|
public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize";
|
||||||
public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";
|
public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
-- 检查检查配置
|
-- 检查检查配置
|
||||||
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_CLUSTER_NO_CONTROLLER','{ \"value\": 1, \"weight\": 30 } ','集群Controller数错误','know-stream');
|
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_CLUSTER_NO_CONTROLLER','{ \"value\": 1, \"weight\": 30 } ','集群Controller数正常','know-streaming');
|
||||||
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_REQUEST_QUEUE_FULL','{ \"value\": 10, \"weight\": 20 } ','Broker请求队列被打满','know-stream');
|
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_REQUEST_QUEUE_FULL','{ \"value\": 10, \"weight\": 20 } ','Broker-RequestQueueSize指标','know-streaming');
|
||||||
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW','{ \"value\": 0.8, \"weight\": 20 } ','Broker网络处理线程Idle过低','know-stream');
|
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW','{ \"value\": 0.8, \"weight\": 20 } ','Broker-NetworkProcessorAvgIdlePercent指标','know-streaming');
|
||||||
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_GROUP_RE_BALANCE_TOO_FREQUENTLY','{\n \"latestMinutes\": 10,\n \"detectedTimes\": 8,\n \"weight\": 10\n}\n','Group的re-balance太频繁','know-stream');
|
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_GROUP_RE_BALANCE_TOO_FREQUENTLY','{\n \"latestMinutes\": 10,\n \"detectedTimes\": 8,\n \"weight\": 10\n}\n','Group的re-balance频率','know-streaming');
|
||||||
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_NO_LEADER','{ \"value\": 1, \"weight\": 10 } ','Topic无Leader数','know-stream');
|
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_NO_LEADER','{ \"value\": 1, \"weight\": 10 } ','Topic 无Leader数','know-stream');
|
||||||
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_UNDER_REPLICA_TOO_LONG','{ \"latestMinutes\": 10, \"detectedTimes\": 8, \"weight\": 10 } ','Topic长期处于未同步状态','know-stream');
|
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_UNDER_REPLICA_TOO_LONG','{ \"latestMinutes\": 10, \"detectedTimes\": 8, \"weight\": 10 } ','Topic 未同步持续时间','know-streaming');
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.validation.annotation.Validated;
|
import org.springframework.validation.annotation.Validated;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -49,7 +49,7 @@ public class MultiClusterPhyController {
|
|||||||
|
|
||||||
@ApiOperation(value = "多物理集群-已存在kafka版本", notes = "")
|
@ApiOperation(value = "多物理集群-已存在kafka版本", notes = "")
|
||||||
@GetMapping(value = "physical-clusters/exist-version")
|
@GetMapping(value = "physical-clusters/exist-version")
|
||||||
public Result<Set<String>> getClusterPhysVersion() {
|
public Result<List<String>> getClusterPhysVersion() {
|
||||||
return Result.buildSuc(clusterPhyService.getClusterVersionSet());
|
return Result.buildSuc(clusterPhyService.getClusterVersionList());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user