同步代码

This commit is contained in:
zengqiao
2022-08-23 19:01:53 +08:00
parent e90c5003ae
commit e1514c901b
76 changed files with 796 additions and 577 deletions

View File

@@ -8,7 +8,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class CollectMetricsLocalCache {
public class CollectedMetricsLocalCache {
private static final Cache<String, Float> brokerMetricsCache = Caffeine.newBuilder()
.expireAfterWrite(60, TimeUnit.SECONDS)
.maximumSize(2000)
@@ -30,47 +30,47 @@ public class CollectMetricsLocalCache {
.build();
public static Float getBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName) {
return brokerMetricsCache.getIfPresent(CollectMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName));
return brokerMetricsCache.getIfPresent(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName));
}
public static void putBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName, Float value) {
if (value == null) {
return;
}
brokerMetricsCache.put(CollectMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value);
brokerMetricsCache.put(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value);
}
public static List<TopicMetrics> getTopicMetrics(Long clusterPhyId, String topicName, String metricName) {
return topicMetricsCache.getIfPresent(CollectMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
return topicMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
}
public static void putTopicMetrics(Long clusterPhyId, String topicName, String metricName, List<TopicMetrics> metricsList) {
if (metricsList == null) {
return;
}
topicMetricsCache.put(CollectMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
topicMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
}
public static List<PartitionMetrics> getPartitionMetricsList(Long clusterPhyId, String topicName, String metricName) {
return partitionMetricsCache.getIfPresent(CollectMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
return partitionMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
}
public static void putPartitionMetricsList(Long clusterPhyId, String topicName, String metricName, List<PartitionMetrics> metricsList) {
if (metricsList == null) {
return;
}
partitionMetricsCache.put(CollectMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
partitionMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
}
public static Float getReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
return replicaMetricsValueCache.getIfPresent(CollectMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName));
return replicaMetricsValueCache.getIfPresent(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName));
}
public static void putReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName, Float value) {
if (value == null) {
return;
}
replicaMetricsValueCache.put(CollectMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value);
replicaMetricsValueCache.put(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value);
}

View File

@@ -25,7 +25,7 @@ import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.CollectMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService;
@@ -112,7 +112,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
@Override
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric){
Float keyValue = CollectMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric);
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric);
if(null != keyValue) {
BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId);
brokerMetrics.putMetric(metric, keyValue);
@@ -124,7 +124,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
Map<String, Float> metricsMap = ret.getData().getMetrics();
for(Map.Entry<String, Float> metricNameAndValueEntry : metricsMap.entrySet()){
CollectMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue());
CollectedMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue());
}
return ret;

View File

@@ -15,6 +15,7 @@ public interface PartitionMetricService {
*/
Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName);
Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafka(Long clusterPhyId, String topicName, List<String> metricNameList);
Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafka(Long clusterPhyId, String topicName, String metricName);
Result<PartitionMetrics> collectPartitionMetricsFromKafka(Long clusterPhyId, String topicName, Integer partitionId, String metricName);

View File

@@ -16,7 +16,7 @@ import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.CollectMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
@@ -29,10 +29,7 @@ import org.springframework.stereotype.Service;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -78,7 +75,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
@Override
public Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName) {
List<PartitionMetrics> metricsList = CollectMetricsLocalCache.getPartitionMetricsList(clusterPhyId, topicName, metricName);
List<PartitionMetrics> metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(clusterPhyId, topicName, metricName);
if(null != metricsList) {
return Result.buildSuc(metricsList);
}
@@ -91,7 +88,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
// 更新cache
PartitionMetrics metrics = metricsResult.getData().get(0);
metrics.getMetrics().entrySet().forEach(
metricEntry -> CollectMetricsLocalCache.putPartitionMetricsList(
metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(
clusterPhyId,
metrics.getTopic(),
metricEntry.getKey(),
@@ -102,6 +99,32 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
return metricsResult;
}
@Override
public Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafka(Long clusterPhyId, String topicName, List<String> metricNameList) {
Set<String> collectedMetricSet = new HashSet<>();
Map<Integer, PartitionMetrics> metricsMap = new HashMap<>();
for (String metricName: metricNameList) {
if (collectedMetricSet.contains(metricName)) {
continue;
}
Result<List<PartitionMetrics>> metricsResult = this.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricName);
if(null == metricsResult || metricsResult.failed() || null == metricsResult.getData() || metricsResult.getData().isEmpty()) {
continue;
}
collectedMetricSet.addAll(metricsResult.getData().get(0).getMetrics().keySet());
for (PartitionMetrics metrics: metricsResult.getData()) {
metricsMap.putIfAbsent(metrics.getPartitionId(), metrics);
metricsMap.get(metrics.getPartitionId()).putMetric(metrics.getMetrics());
}
}
return Result.buildSuc(new ArrayList<>(metricsMap.values()));
}
@Override
public Result<PartitionMetrics> collectPartitionMetricsFromKafka(Long clusterPhyId, String topicName, Integer partitionId, String metricName){
Result<List<PartitionMetrics>> metricsResult = this.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricName);

View File

@@ -6,6 +6,7 @@ import com.didiglobal.logi.log.LogFactory;
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.config.KafkaTopicConfigParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.reassign.ExecuteReassignParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.reassign.ReassignResult;
@@ -19,6 +20,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.bean.po.reassign.ReassignJobPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.reassign.ReassignSubJobPO;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.constant.kafka.TopicConfig0100;
import com.xiaojukeji.know.streaming.km.common.converter.ReassignConverter;
import com.xiaojukeji.know.streaming.km.common.enums.job.JobStatusEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
@@ -33,6 +35,7 @@ import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService;
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignService;
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ReplicaMetricVersionItems;
import com.xiaojukeji.know.streaming.km.persistence.mysql.reassign.ReassignJobDAO;
@@ -79,6 +82,9 @@ public class ReassignJobServiceImpl implements ReassignJobService {
@Autowired
private OpLogWrapService opLogWrapService;
@Autowired
private TopicConfigService topicConfigService;
@Override
@Transactional
public Result<Void> create(Long jobId, ReplaceReassignJob replaceReassignJob, String creator) {
@@ -266,16 +272,22 @@ public class ReassignJobServiceImpl implements ReassignJobService {
}
// 修改DB状态
this.setJobInRunning(jobPO);
Result<List<ReassignSubJobPO>> subJobPOListResult = this.setJobInRunning(jobPO);
// 执行任务
Result<Void> rv = reassignService.executePartitionReassignments(new ExecuteReassignParam(jobPO.getClusterPhyId(), jobPO.getReassignmentJson(), jobPO.getThrottleUnitByte()));
if (rv.failed()) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return rv;
}
// 修改保存时间
rv = this.modifyRetentionTime(jobPO.getClusterPhyId(), subJobPOListResult.getData(), operator);
if (rv.failed()) {
log.error("method=execute||jobId={}||result={}||errMsg=modify retention time failed", jobId, rv);
return rv;
}
// 记录操作
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
operator,
@@ -360,6 +372,12 @@ public class ReassignJobServiceImpl implements ReassignJobService {
return Result.buildFromIgnoreData(rrr);
}
// 还原数据保存时间,后续可以优化为缩短一段时间后就还原为原来时间
rv = this.recoveryRetentionTime(jobPO, rrr.getData());
if (rv != null && rv.failed()) {
log.error("method=verifyAndUpdateStatue||jobId={}||result={}||msg=recovery retention time failed", jobId, rv);
}
// 更新任务状态
return this.checkAndSetSuccessIfFinished(jobPO, rrr.getData());
}
@@ -524,7 +542,7 @@ public class ReassignJobServiceImpl implements ReassignJobService {
return Result.buildSuc();
}
private Result<Void> setJobInRunning(ReassignJobPO jobPO) {
private Result<List<ReassignSubJobPO>> setJobInRunning(ReassignJobPO jobPO) {
long now = System.currentTimeMillis();
// 更新子任务状态
@@ -545,7 +563,7 @@ public class ReassignJobServiceImpl implements ReassignJobService {
newJobPO.setStartTime(new Date(now));
reassignJobDAO.updateById(newJobPO);
return Result.buildSuc();
return Result.buildSuc(subJobPOList);
}
@@ -714,4 +732,76 @@ public class ReassignJobServiceImpl implements ReassignJobService {
return replicaMetricsResult.getData().getMetric(ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE);
}
/**
* 还原保存时间
*/
private Result<Void> recoveryRetentionTime(ReassignJobPO jobPO, ReassignResult reassignmentResult) {
Map<String, Long> finishedTopicRetentionTimeMap = new HashMap<>();
List<ReassignSubJobPO> subJobPOList = this.getSubJobsByJobId(jobPO.getId());
for (ReassignSubJobPO subJobPO: subJobPOList) {
ReassignSubJobExtendData extendData = ConvertUtil.str2ObjByJson(subJobPO.getExtendData(), ReassignSubJobExtendData.class);
if (extendData == null
|| extendData.getOriginalRetentionTimeUnitMs() == null
|| extendData.getReassignRetentionTimeUnitMs() == null
|| extendData.getOriginalRetentionTimeUnitMs().equals(extendData.getReassignRetentionTimeUnitMs())) {
// 不存在扩展数据,或者这个时间是不需要调整的,则直接跳过
continue;
}
finishedTopicRetentionTimeMap.put(subJobPO.getTopicName(), extendData.getOriginalRetentionTimeUnitMs());
}
// 仅保留已经迁移完成的Topic
for (ReassignSubJobPO subJobPO: subJobPOList) {
if (!reassignmentResult.checkPartitionFinished(subJobPO.getTopicName(), subJobPO.getPartitionId())) {
// 移除未完成的Topic
finishedTopicRetentionTimeMap.remove(subJobPO.getTopicName());
}
}
// 还原迁移完成的Topic的保存时间
for (Map.Entry<String, Long> entry: finishedTopicRetentionTimeMap.entrySet()) {
Map<String, String> changedProps = new HashMap<>();
changedProps.put(TopicConfig0100.RETENTION_MS_CONFIG, String.valueOf(entry.getValue()));
Result<Void> rv = topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(jobPO.getClusterPhyId(), entry.getKey(), changedProps), jobPO.getCreator());
if (rv == null || rv.failed()) {
return rv;
}
}
return Result.buildSuc();
}
private Result<Void> modifyRetentionTime(Long clusterPhyId, List<ReassignSubJobPO> subJobPOList, String operator) {
Map<String, Long> needModifyTopicRetentionTimeMap = new HashMap<>();
for (ReassignSubJobPO subJobPO: subJobPOList) {
ReassignSubJobExtendData extendData = ConvertUtil.str2ObjByJson(subJobPO.getExtendData(), ReassignSubJobExtendData.class);
if (extendData == null
|| extendData.getOriginalRetentionTimeUnitMs() == null
|| extendData.getReassignRetentionTimeUnitMs() == null
|| extendData.getOriginalRetentionTimeUnitMs().equals(extendData.getReassignRetentionTimeUnitMs())) {
// 不存在扩展数据,或者这个时间是不需要调整的,则直接跳过
continue;
}
needModifyTopicRetentionTimeMap.put(subJobPO.getTopicName(), extendData.getReassignRetentionTimeUnitMs());
}
// 修改Topic的保存时间
Result<Void> returnRV = Result.buildSuc();
for (Map.Entry<String, Long> entry: needModifyTopicRetentionTimeMap.entrySet()) {
Map<String, String> changedProps = new HashMap<>();
changedProps.put(TopicConfig0100.RETENTION_MS_CONFIG, String.valueOf(entry.getValue()));
Result<Void> rv = topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, entry.getKey(), changedProps), operator);
if (rv == null || rv.failed()) {
returnRV = rv;
}
}
return returnRV;
}
}

View File

@@ -17,7 +17,7 @@ import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.CollectMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
@@ -79,7 +79,7 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
@Override
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic,
Integer brokerId, Integer partitionId, String metric){
Float keyValue = CollectMetricsLocalCache.getReplicaMetrics(clusterPhyId, brokerId, topic, partitionId, metric);
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(clusterPhyId, brokerId, topic, partitionId, metric);
if(null != keyValue){
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId);
replicationMetrics.putMetric(metric, keyValue);
@@ -91,7 +91,7 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
// 更新cache
ret.getData().getMetrics().entrySet().stream().forEach(
metricNameAndValueEntry -> CollectMetricsLocalCache.putReplicaMetrics(
metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics(
clusterPhyId,
brokerId,
topic,

View File

@@ -29,7 +29,7 @@ import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.CollectMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
@@ -120,7 +120,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
@Override
public Result<List<TopicMetrics>> collectTopicMetricsFromKafkaWithCacheFirst(Long clusterPhyId, String topicName, String metricName) {
List<TopicMetrics> metricsList = CollectMetricsLocalCache.getTopicMetrics(clusterPhyId, topicName, metricName);
List<TopicMetrics> metricsList = CollectedMetricsLocalCache.getTopicMetrics(clusterPhyId, topicName, metricName);
if(null != metricsList) {
return Result.buildSuc(metricsList);
}
@@ -133,7 +133,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
// 更新cache
TopicMetrics metrics = metricsResult.getData().get(0);
metrics.getMetrics().entrySet().forEach(
metricEntry -> CollectMetricsLocalCache.putTopicMetrics(
metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(
clusterPhyId,
metrics.getTopic(),
metricEntry.getKey(),