mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-02 18:32:08 +08:00
v2.8.1_e初始化
1、测试代码,开源用户尽量不要使用; 2、包含Kafka-HA的相关功能,在v2.8.0_e的基础上,补充按照clientId切换的功能; 3、基于v2.8.0_e拉的分支;
This commit is contained in:
@@ -4,13 +4,15 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.app.AppRelateTopicsVO;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
/**
|
||||
* Ha App管理
|
||||
*/
|
||||
public interface HaAppManager {
|
||||
Result<List<AppRelateTopicsVO>> appRelateTopics(Long clusterPhyId, List<String> filterTopicNameList);
|
||||
Result<List<AppRelateTopicsVO>> appRelateTopics(Boolean ha, Long clusterPhyId, List<String> filterTopicNameList);
|
||||
Result<List<AppRelateTopicsVO>> appAndClientRelateTopics(Long clusterPhyId, Set<String> filterTopicNameSet);
|
||||
|
||||
boolean isContainAllRelateAppTopics(Long clusterPhyId, List<String> filterTopicNameList);
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.xiaojukeji.kafka.manager.service.biz.ha;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.HaSwitchTopic;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.HaTopicRelationDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO;
|
||||
|
||||
@@ -37,6 +38,7 @@ public interface HaTopicManager {
|
||||
Result<HaSwitchTopic> switchHaWithCanRetry(Long newActiveClusterPhyId,
|
||||
Long newStandbyClusterPhyId,
|
||||
List<String> switchTopicNameList,
|
||||
List<KafkaUserAndClientDTO> kafkaUserAndClientIdList,
|
||||
boolean focus,
|
||||
boolean firstTriggerExecute,
|
||||
JobLogDO switchLogTemplate,
|
||||
|
||||
@@ -1,15 +1,25 @@
|
||||
package com.xiaojukeji.kafka.manager.service.biz.ha.impl;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.app.AppRelateTopicsVO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.FutureUtil;
|
||||
import com.xiaojukeji.kafka.manager.service.biz.ha.HaAppManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ConfigService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@@ -22,17 +32,45 @@ public class HaAppManagerImpl implements HaAppManager {
|
||||
@Autowired
|
||||
private HaASRelationService haASRelationService;
|
||||
|
||||
@Autowired
|
||||
private TopicConnectionService topicConnectionService;
|
||||
|
||||
@Autowired
|
||||
private ConfigService configService;
|
||||
|
||||
@Autowired
|
||||
private TopicManagerService topicManagerService;
|
||||
|
||||
private static final FutureUtil<Result<List<AppRelateTopicsVO>>> ConnectionsSearchTP = FutureUtil.init(
|
||||
"ConnectionsSearchTP",
|
||||
5,
|
||||
5,
|
||||
500
|
||||
);
|
||||
|
||||
@Override
|
||||
public Result<List<AppRelateTopicsVO>> appRelateTopics(Long clusterPhyId, List<String> filterTopicNameList) {
|
||||
public Result<List<AppRelateTopicsVO>> appRelateTopics(Boolean ha, Long clusterPhyId, List<String> filterTopicNameList) {
|
||||
// 获取关联的Topic列表
|
||||
Map<String, Set<String>> userTopicMap = this.appRelateTopicsMap(clusterPhyId, filterTopicNameList);
|
||||
|
||||
Map<String, Set<String>> appClientSetMap = haASRelationService.listAllHAClient(clusterPhyId, userTopicMap.keySet());
|
||||
|
||||
// 获取集群已建立HA的Topic列表
|
||||
Set<String> haTopicNameSet = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.TOPIC)
|
||||
.stream()
|
||||
.map(elem -> elem.getActiveResName())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
Set<String> topicNameSet = null;
|
||||
if (ha) {
|
||||
topicNameSet = haTopicNameSet;
|
||||
}else {
|
||||
List<TopicDO> topicDOS = topicManagerService.getByClusterId(clusterPhyId);
|
||||
topicNameSet = topicDOS.stream()
|
||||
.filter(topicBizPO -> !haTopicNameSet.contains(topicBizPO.getTopicName()))
|
||||
.map(TopicDO::getTopicName).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
Set<String> filterTopicNameSet = new HashSet<>(filterTopicNameList);
|
||||
|
||||
List<AppRelateTopicsVO> voList = new ArrayList<>();
|
||||
@@ -40,16 +78,18 @@ public class HaAppManagerImpl implements HaAppManager {
|
||||
AppRelateTopicsVO vo = new AppRelateTopicsVO();
|
||||
vo.setClusterPhyId(clusterPhyId);
|
||||
vo.setKafkaUser(entry.getKey());
|
||||
vo.setHaClientIdList(new ArrayList<>(appClientSetMap.getOrDefault(entry.getKey(), new HashSet<>())));
|
||||
vo.setSelectedTopicNameList(new ArrayList<>());
|
||||
vo.setNotSelectTopicNameList(new ArrayList<>());
|
||||
vo.setNotHaTopicNameList(new ArrayList<>());
|
||||
Set<String> finalTopicNameSet = topicNameSet;
|
||||
entry.getValue().forEach(elem -> {
|
||||
if (elem.startsWith("__")) {
|
||||
// ignore
|
||||
return;
|
||||
}
|
||||
|
||||
if (!haTopicNameSet.contains(elem)) {
|
||||
if (!finalTopicNameSet.contains(elem)) {
|
||||
vo.getNotHaTopicNameList().add(elem);
|
||||
} else if (filterTopicNameSet.contains(elem)) {
|
||||
vo.getSelectedTopicNameList().add(elem);
|
||||
@@ -64,6 +104,104 @@ public class HaAppManagerImpl implements HaAppManager {
|
||||
return Result.buildSuc(voList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<List<AppRelateTopicsVO>> appAndClientRelateTopics(Long clusterPhyId, Set<String> filterTopicNameSet) {
|
||||
List<HaASRelationDO> haASRelationDOList = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.CLUSTER);
|
||||
Long secondClusterId = null;
|
||||
for (HaASRelationDO asRelationDO: haASRelationDOList) {
|
||||
if (clusterPhyId.equals(asRelationDO.getActiveClusterPhyId())) {
|
||||
secondClusterId = asRelationDO.getStandbyClusterPhyId();
|
||||
} else {
|
||||
secondClusterId = asRelationDO.getActiveClusterPhyId();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
Map<String/*TopicName*/, Result<Map<String/*KafkaUser*/, Set<String>/*ClientID*/>>> connectionsResultMap = new ConcurrentHashMap<>();
|
||||
|
||||
// 生效时间
|
||||
Long activeMin = configService.getLongValue(ConfigConstant.HA_CONNECTION_ACTIVE_TIME_UNIT_MIN, Constant.TOPIC_CONNECTION_LATEST_TIME_MS / 1000 / 60);
|
||||
|
||||
// 获取Topic关联的连接
|
||||
for (String topicName: filterTopicNameSet) {
|
||||
Long tempSecondClusterId = secondClusterId;
|
||||
ConnectionsSearchTP.runnableTask(
|
||||
String.format("clusterPhyId=%d||topicName=%s", clusterPhyId, topicName),
|
||||
10000,
|
||||
() -> {
|
||||
Result<Map<String, Set<String>>> userAndClientMapResult = topicConnectionService.getHaKafkaUserAndClientIdByTopicName(
|
||||
clusterPhyId,
|
||||
tempSecondClusterId,
|
||||
topicName,
|
||||
new Date(System.currentTimeMillis() - activeMin * 60L * 1000L),
|
||||
new Date()
|
||||
);
|
||||
|
||||
connectionsResultMap.put(topicName, userAndClientMapResult);
|
||||
}
|
||||
);
|
||||
|
||||
ConnectionsSearchTP.waitExecute(10000);
|
||||
}
|
||||
|
||||
// 因为接口比较重要,只要一出现异常,则直接返回错误
|
||||
for (Result<Map<String, Set<String>>> valueResult: connectionsResultMap.values()) {
|
||||
if (valueResult.failed()) {
|
||||
return Result.buildFromIgnoreData(valueResult);
|
||||
}
|
||||
}
|
||||
|
||||
// 查询结果转Map
|
||||
Map<String/*KafkaUser*/, Set<String>/*ClientID*/> kafkaUserAndClientMap = new HashMap<>();
|
||||
for (Result<Map<String, Set<String>>> valueResult: connectionsResultMap.values()) {
|
||||
for (Map.Entry<String, Set<String>> entry: valueResult.getData().entrySet()) {
|
||||
kafkaUserAndClientMap.putIfAbsent(entry.getKey(), new HashSet<>());
|
||||
kafkaUserAndClientMap.get(entry.getKey()).addAll(entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// 获取集群已建立HA的Topic列表
|
||||
Set<String> haTopicNameSet = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.TOPIC)
|
||||
.stream()
|
||||
.map(elem -> elem.getActiveResName())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// 获取KafkaUser+Client下的Topic列表
|
||||
List<AppRelateTopicsVO> voList = new ArrayList<>();
|
||||
for (Map.Entry<String, Set<String>> entry: kafkaUserAndClientMap.entrySet()) {
|
||||
Long tempSecondClusterId = secondClusterId;
|
||||
ConnectionsSearchTP.runnableTask(
|
||||
"",
|
||||
10000,
|
||||
() -> {
|
||||
Result<List<TopicConnectionDO>> doListResult = topicConnectionService.getByClusterAndAppId(
|
||||
clusterPhyId,
|
||||
tempSecondClusterId,
|
||||
entry.getKey(),
|
||||
new Date(System.currentTimeMillis() - activeMin * 60L * 1000L),
|
||||
new Date()
|
||||
);
|
||||
if (doListResult.failed()) {
|
||||
return Result.buildFromIgnoreData(doListResult);
|
||||
}
|
||||
|
||||
return Result.buildSuc(convert2VOList(clusterPhyId, entry.getValue(), doListResult.getData(), haTopicNameSet, filterTopicNameSet));
|
||||
}
|
||||
);
|
||||
|
||||
for (Result<List<AppRelateTopicsVO>> elem: ConnectionsSearchTP.waitResult(10000)) {
|
||||
if (elem.failed()) {
|
||||
Result.buildFromIgnoreData(elem);
|
||||
}
|
||||
|
||||
voList.addAll(elem.getData());
|
||||
}
|
||||
}
|
||||
|
||||
return Result.buildSuc(voList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isContainAllRelateAppTopics(Long clusterPhyId, List<String> filterTopicNameList) {
|
||||
Map<String, Set<String>> userTopicMap = this.appRelateTopicsMap(clusterPhyId, filterTopicNameList);
|
||||
@@ -91,4 +229,41 @@ public class HaAppManagerImpl implements HaAppManager {
|
||||
|
||||
return userTopicMap;
|
||||
}
|
||||
|
||||
private List<AppRelateTopicsVO> convert2VOList(Long clusterPhyId,
|
||||
Set<String> clientIdSet,
|
||||
List<TopicConnectionDO> connectionList,
|
||||
Set<String> haTopicNameSet,
|
||||
Set<String> filterTopicNameSet) {
|
||||
Map<String/*clientID*/, AppRelateTopicsVO> voMap = new HashMap<>();
|
||||
for (TopicConnectionDO connection: connectionList) {
|
||||
if (connection.getTopicName().startsWith("__")) {
|
||||
// 忽略系统内部Topic
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!clientIdSet.contains("") && !clientIdSet.contains(connection.getClientId())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
AppRelateTopicsVO vo = voMap.get(connection.getClientId());
|
||||
if (vo == null) {
|
||||
vo = new AppRelateTopicsVO(clusterPhyId, connection.getAppId(), connection.getClientId());
|
||||
}
|
||||
|
||||
if (!haTopicNameSet.contains(connection.getTopicName())) {
|
||||
vo.addNotHaIfNotExist(connection.getTopicName());
|
||||
}
|
||||
|
||||
if (!filterTopicNameSet.contains(connection.getTopicName())) {
|
||||
vo.addNotSelectedIfNotExist(connection.getTopicName());
|
||||
} else {
|
||||
vo.addSelectedIfNotExist(connection.getTopicName());
|
||||
}
|
||||
|
||||
voMap.put(connection.getClientId(), vo);
|
||||
}
|
||||
|
||||
return new ArrayList<>(voMap.values());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.HaSwitchTopic;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.HaTopicRelationDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
|
||||
@@ -14,13 +15,14 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.HAUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.service.biz.ha.HaTopicManager;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.AdminService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.JobLogService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ha.HaKafkaUserService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ha.HaTopicService;
|
||||
@@ -28,6 +30,7 @@ import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
@@ -40,9 +43,6 @@ public class HaTopicManagerImpl implements HaTopicManager {
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@Autowired
|
||||
private AuthorityService authorityService;
|
||||
|
||||
@Autowired
|
||||
private HaTopicService haTopicService;
|
||||
|
||||
@@ -61,10 +61,14 @@ public class HaTopicManagerImpl implements HaTopicManager {
|
||||
@Autowired
|
||||
private JobLogService jobLogService;
|
||||
|
||||
@Autowired
|
||||
private AdminService adminService;
|
||||
|
||||
@Override
|
||||
public Result<HaSwitchTopic> switchHaWithCanRetry(Long newActiveClusterPhyId,
|
||||
Long newStandbyClusterPhyId,
|
||||
List<String> switchTopicNameList,
|
||||
List<KafkaUserAndClientDTO> kafkaUserAndClientIdList,
|
||||
boolean focus,
|
||||
boolean firstTriggerExecute,
|
||||
JobLogDO switchLogTemplate,
|
||||
@@ -106,7 +110,7 @@ public class HaTopicManagerImpl implements HaTopicManager {
|
||||
}
|
||||
|
||||
// 4、进行切换预处理
|
||||
HaSwitchTopic switchTopic = this.prepareSwitching(newStandbyClusterPhyDO, doList, focus, switchLogTemplate);
|
||||
HaSwitchTopic switchTopic = this.prepareSwitching(newStandbyClusterPhyDO, doList, kafkaUserAndClientIdList, focus, switchLogTemplate);
|
||||
|
||||
// 5、直接等待10秒,使得相关数据有机会同步完成
|
||||
BackoffUtils.backoff(10000);
|
||||
@@ -125,7 +129,15 @@ public class HaTopicManagerImpl implements HaTopicManager {
|
||||
switchTopic.addHaSwitchTopic(this.newStandbyTopicAddFetchConfig(newActiveClusterPhyDO, newStandbyClusterPhyDO, doList, focus, switchLogTemplate, operator));
|
||||
|
||||
// 9、进行切换收尾
|
||||
switchTopic.addHaSwitchTopic(this.closeoutSwitching(newActiveClusterPhyDO, newStandbyClusterPhyDO, configUtils.getDKafkaGatewayZK(), doList, focus, switchLogTemplate));
|
||||
switchTopic.addHaSwitchTopic(this.closeoutSwitching(
|
||||
newActiveClusterPhyDO,
|
||||
newStandbyClusterPhyDO,
|
||||
configUtils.getDKafkaGatewayZK(),
|
||||
doList,
|
||||
kafkaUserAndClientIdList,
|
||||
focus,
|
||||
switchLogTemplate
|
||||
));
|
||||
|
||||
// 10、状态结果汇总记录
|
||||
doList.forEach(elem -> switchTopic.addActiveTopicStatus(elem.getActiveResName(), elem.getStatus()));
|
||||
@@ -136,6 +148,18 @@ public class HaTopicManagerImpl implements HaTopicManager {
|
||||
newActiveClusterPhyId, newStandbyClusterPhyId, ConvertUtil.obj2Json(switchTopicNameList), switchTopic, operator
|
||||
);
|
||||
|
||||
if (switchTopic.isFinished()) {
|
||||
// 全都切换完成,则更新HA信息
|
||||
try {
|
||||
updateHAClient(newActiveClusterPhyId, newStandbyClusterPhyId, kafkaUserAndClientIdList);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=switchHaWithCanRetry||newActiveClusterPhyId={}||newStandbyClusterPhyId={}||kafkaUserAndClientIdList={}||operator={}||errMsg=exception",
|
||||
newActiveClusterPhyId, newStandbyClusterPhyId, ConvertUtil.obj2Json(kafkaUserAndClientIdList), operator, e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return Result.buildSuc(switchTopic);
|
||||
}
|
||||
|
||||
@@ -188,6 +212,20 @@ public class HaTopicManagerImpl implements HaTopicManager {
|
||||
}
|
||||
|
||||
Result<Void> rv = haTopicService.deleteHA(relationDO.getActiveClusterPhyId(), relationDO.getStandbyClusterPhyId(), topicName, operator);
|
||||
|
||||
//删除备topic资源
|
||||
if (dto.getRetainStandbyResource() != null && !dto.getRetainStandbyResource()) {
|
||||
ResultStatus statusEnum = adminService.deleteTopic(
|
||||
PhysicalClusterMetadataManager.getClusterFromCache(dto.getStandbyClusterId()),
|
||||
topicName,
|
||||
operator);
|
||||
if (statusEnum.getCode() != ResultStatus.SUCCESS.getCode()){
|
||||
LOGGER.error(
|
||||
"method=batchRemoveHaTopic||activeClusterPhyId={}||standbyClusterPhyId={}||topicName={}||result={}||msg=delete standby topic failed.",
|
||||
dto.getActiveClusterId(), dto.getStandbyClusterId(), topicName, statusEnum
|
||||
);
|
||||
}
|
||||
}
|
||||
operationResultList.add(TopicOperationResult.buildFrom(dto.getActiveClusterId(), topicName, rv));
|
||||
}
|
||||
|
||||
@@ -200,58 +238,43 @@ public class HaTopicManagerImpl implements HaTopicManager {
|
||||
jobLogService.addLogAndIgnoreException(switchLogTemplate.setAndCopyNew(new Date(), content));
|
||||
}
|
||||
|
||||
/**
|
||||
* 切换预处理
|
||||
* 1、在主集群上,将Topic关联的KafkaUser的active集群设置为None
|
||||
*/
|
||||
private HaSwitchTopic prepareSwitching(ClusterDO oldActiveClusterPhyDO, List<HaASRelationDO> doList, boolean focus, JobLogDO switchLogTemplate) {
|
||||
// 暂停HA的KafkaUser
|
||||
Set<String> stoppedHaKafkaUserSet = new HashSet<>();
|
||||
|
||||
private HaSwitchTopic prepareSwitching(ClusterDO oldActiveClusterPhyDO,
|
||||
List<HaASRelationDO> doList,
|
||||
List<KafkaUserAndClientDTO> clientDTOList,
|
||||
boolean focus,
|
||||
JobLogDO switchLogTemplate) {
|
||||
HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
|
||||
|
||||
boolean allSuccess = true; // 所有都成功
|
||||
boolean needLog = false; // 需要记录日志
|
||||
for (HaASRelationDO relationDO: doList) {
|
||||
if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_PREPARE_CODE)) {
|
||||
// 当前不处于prepare状态
|
||||
haSwitchTopic.setFinished(true);
|
||||
continue;
|
||||
}
|
||||
needLog = true;
|
||||
|
||||
// 获取关联的KafkaUser
|
||||
Set<String> relatedKafkaUserSet = authorityService.getAuthorityByTopic(relationDO.getActiveClusterPhyId(), relationDO.getActiveResName())
|
||||
.stream()
|
||||
.map(elem -> elem.getAppId())
|
||||
.filter(kafkaUser -> !stoppedHaKafkaUserSet.contains(kafkaUser))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// 暂停kafkaUser HA
|
||||
for (String kafkaUser: relatedKafkaUserSet) {
|
||||
Result<Void> rv = haKafkaUserService.setNoneHAInKafka(oldActiveClusterPhyDO.getZookeeper(), kafkaUser);
|
||||
if (rv.failed() && !focus) {
|
||||
haSwitchTopic.setFinished(false);
|
||||
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName())));
|
||||
return haSwitchTopic;
|
||||
} else if (rv.failed() && focus) {
|
||||
allSuccess = false;
|
||||
}
|
||||
}
|
||||
|
||||
// 记录操作过的user
|
||||
stoppedHaKafkaUserSet.addAll(relatedKafkaUserSet);
|
||||
|
||||
// 修改Topic主备状态
|
||||
relationDO.setStatus(HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
|
||||
haASRelationService.updateRelationStatus(relationDO.getId(), HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
|
||||
// 存在prepare状态的,则就需要进行预处理操作
|
||||
boolean needDOIt = doList.stream().filter(elem -> elem.getStatus().equals(HaStatusEnum.SWITCHING_PREPARE_CODE)).count() > 0;
|
||||
if (!needDOIt) {
|
||||
// 不需要做
|
||||
return haSwitchTopic;
|
||||
}
|
||||
|
||||
if (needLog) {
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作"));
|
||||
// 暂停kafkaUser HA
|
||||
for (KafkaUserAndClientDTO dto: clientDTOList) {
|
||||
Result<Void> rv = haKafkaUserService.setNoneHAInKafka(oldActiveClusterPhyDO.getZookeeper(), HAUtils.mergeKafkaUserAndClient(dto.getKafkaUser(), dto.getClientId()));
|
||||
if (rv.failed() && !focus) {
|
||||
haSwitchTopic.setFinished(false);
|
||||
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName())));
|
||||
return haSwitchTopic;
|
||||
} else if (rv.failed() && focus) {
|
||||
allSuccess = false;
|
||||
}
|
||||
}
|
||||
|
||||
// 修改Topic主备状态
|
||||
doList.forEach(elem -> {
|
||||
elem.setStatus(HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
|
||||
haASRelationService.updateRelationStatus(elem.getId(), HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
|
||||
});
|
||||
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作"));
|
||||
|
||||
haSwitchTopic.setFinished(true);
|
||||
return haSwitchTopic;
|
||||
}
|
||||
@@ -412,87 +435,76 @@ public class HaTopicManagerImpl implements HaTopicManager {
|
||||
* 2、原先的备集群-修改user的active集群,指向新的主集群
|
||||
* 3、网关-修改user的active集群,指向新的主集群
|
||||
*/
|
||||
private HaSwitchTopic closeoutSwitching(ClusterDO newActiveClusterPhyDO, ClusterDO newStandbyClusterPhyDO, String gatewayZK, List<HaASRelationDO> doList, boolean focus, JobLogDO switchLogTemplate) {
|
||||
// 暂停HA的KafkaUser
|
||||
Set<String> activeHaKafkaUserSet = new HashSet<>();
|
||||
private HaSwitchTopic closeoutSwitching(ClusterDO newActiveClusterPhyDO,
|
||||
ClusterDO newStandbyClusterPhyDO,
|
||||
String gatewayZK,
|
||||
List<HaASRelationDO> doList,
|
||||
List<KafkaUserAndClientDTO> kafkaUserAndClientDTOList,
|
||||
boolean focus,
|
||||
JobLogDO switchLogTemplate) {
|
||||
HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
|
||||
|
||||
boolean needDOIt = doList.stream().filter(elem -> elem.getStatus().equals(HaStatusEnum.SWITCHING_CLOSEOUT_CODE)).count() > 0;
|
||||
if (!needDOIt) {
|
||||
// 不需要做任何事情
|
||||
return haSwitchTopic;
|
||||
}
|
||||
|
||||
boolean allSuccess = true;
|
||||
boolean needLog = false;
|
||||
boolean forceAndNewStandbyFailed = false; // 强制切换,但是新的备依旧操作失败
|
||||
|
||||
HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
|
||||
for (HaASRelationDO relationDO: doList) {
|
||||
if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_CLOSEOUT_CODE)) {
|
||||
// 当前不处于closeout状态
|
||||
for (KafkaUserAndClientDTO dto: kafkaUserAndClientDTOList) {
|
||||
String zkNodeName = HAUtils.mergeKafkaUserAndClient(dto.getKafkaUser(), dto.getClientId());
|
||||
|
||||
// 操作新的主集群
|
||||
Result<Void> rv = haKafkaUserService.activeHAInKafka(newActiveClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), zkNodeName);
|
||||
if (rv.failed() && !focus) {
|
||||
haSwitchTopic.setFinished(false);
|
||||
continue;
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
|
||||
return haSwitchTopic;
|
||||
} else if (rv.failed() && focus) {
|
||||
allSuccess = false;
|
||||
}
|
||||
|
||||
needLog = true;
|
||||
|
||||
// 获取关联的KafkaUser
|
||||
Set<String> relatedKafkaUserSet = authorityService.getAuthorityByTopic(relationDO.getActiveClusterPhyId(), relationDO.getActiveResName())
|
||||
.stream()
|
||||
.map(elem -> elem.getAppId())
|
||||
.filter(kafkaUser -> !activeHaKafkaUserSet.contains(kafkaUser))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
for (String kafkaUser: relatedKafkaUserSet) {
|
||||
// 操作新的主集群
|
||||
Result<Void> rv = haKafkaUserService.activeHAInKafka(newActiveClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), kafkaUser);
|
||||
if (rv.failed() && !focus) {
|
||||
haSwitchTopic.setFinished(false);
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
|
||||
return haSwitchTopic;
|
||||
} else if (rv.failed() && focus) {
|
||||
allSuccess = false;
|
||||
}
|
||||
|
||||
// 操作新的备集群,如果出现错误,则下次就不再进行操作ZK。新的备的Topic不是那么重要,因此这里允许出现跳过
|
||||
rv = null;
|
||||
if (!forceAndNewStandbyFailed) {
|
||||
// 如果对备集群的操作过程中,出现了失败,则直接跳过
|
||||
rv = haKafkaUserService.activeHAInKafka(newStandbyClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), kafkaUser);
|
||||
}
|
||||
|
||||
if (rv != null && rv.failed() && !focus) {
|
||||
haSwitchTopic.setFinished(false);
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
|
||||
return haSwitchTopic;
|
||||
} else if (rv != null && rv.failed() && focus) {
|
||||
allSuccess = false;
|
||||
forceAndNewStandbyFailed = true;
|
||||
}
|
||||
|
||||
// 操作网关
|
||||
rv = haKafkaUserService.activeHAInKafka(gatewayZK, newActiveClusterPhyDO.getId(), kafkaUser);
|
||||
if (rv.failed() && !focus) {
|
||||
haSwitchTopic.setFinished(false);
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
|
||||
return haSwitchTopic;
|
||||
} else if (rv.failed() && focus) {
|
||||
allSuccess = false;
|
||||
}
|
||||
// 操作新的备集群,如果出现错误,则下次就不再进行操作ZK。新的备的Topic不是那么重要,因此这里允许出现跳过
|
||||
rv = null;
|
||||
if (!forceAndNewStandbyFailed) {
|
||||
// 如果对备集群的操作过程中,出现了失败,则直接跳过
|
||||
rv = haKafkaUserService.activeHAInKafka(newStandbyClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), zkNodeName);
|
||||
}
|
||||
|
||||
// 记录已经激活的User
|
||||
activeHaKafkaUserSet.addAll(relatedKafkaUserSet);
|
||||
if (rv != null && rv.failed() && !focus) {
|
||||
haSwitchTopic.setFinished(false);
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
|
||||
return haSwitchTopic;
|
||||
} else if (rv != null && rv.failed() && focus) {
|
||||
allSuccess = false;
|
||||
forceAndNewStandbyFailed = true;
|
||||
}
|
||||
|
||||
// 修改Topic主备信息
|
||||
// 操作网关
|
||||
rv = haKafkaUserService.activeHAInKafka(gatewayZK, newActiveClusterPhyDO.getId(), zkNodeName);
|
||||
if (rv.failed() && !focus) {
|
||||
haSwitchTopic.setFinished(false);
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
|
||||
return haSwitchTopic;
|
||||
} else if (rv.failed() && focus) {
|
||||
allSuccess = false;
|
||||
}
|
||||
}
|
||||
|
||||
// 修改Topic主备信息
|
||||
doList.forEach(elem -> {
|
||||
HaASRelationDO newHaASRelationDO = new HaASRelationDO(
|
||||
newActiveClusterPhyDO.getId(), relationDO.getActiveResName(),
|
||||
newStandbyClusterPhyDO.getId(), relationDO.getStandbyResName(),
|
||||
newActiveClusterPhyDO.getId(), elem.getActiveResName(),
|
||||
newStandbyClusterPhyDO.getId(), elem.getStandbyResName(),
|
||||
HaResTypeEnum.TOPIC.getCode(),
|
||||
HaStatusEnum.STABLE_CODE
|
||||
);
|
||||
newHaASRelationDO.setId(relationDO.getId());
|
||||
newHaASRelationDO.setId(elem.getId());
|
||||
|
||||
haASRelationService.updateById(newHaASRelationDO);
|
||||
}
|
||||
|
||||
if (!needLog) {
|
||||
return haSwitchTopic;
|
||||
}
|
||||
});
|
||||
|
||||
this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作"));
|
||||
return haSwitchTopic;
|
||||
@@ -556,4 +568,45 @@ public class HaTopicManagerImpl implements HaTopicManager {
|
||||
|
||||
return Result.buildSuc(relationDO);
|
||||
}
|
||||
|
||||
private void updateHAClient(Long newActiveClusterPhyId,
|
||||
Long newStandbyClusterPhyId,
|
||||
List<KafkaUserAndClientDTO> kafkaUserAndClientIdList) {
|
||||
if (ValidateUtils.isEmptyList(kafkaUserAndClientIdList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<HaASRelationDO> doList = haASRelationService.listAllHAFromDB(newActiveClusterPhyId, HaResTypeEnum.KAFKA_USER_AND_CLIENT);
|
||||
|
||||
Map<String, HaASRelationDO> resNameMap = new HashMap<>();
|
||||
doList.forEach(elem -> resNameMap.put(elem.getActiveResName(), elem));
|
||||
|
||||
for (KafkaUserAndClientDTO dto: kafkaUserAndClientIdList) {
|
||||
if (ValidateUtils.isBlank(dto.getClientId())) {
|
||||
continue;
|
||||
}
|
||||
String resName = HAUtils.mergeKafkaUserAndClient(dto.getKafkaUser(), dto.getClientId());
|
||||
|
||||
HaASRelationDO newDO = new HaASRelationDO(
|
||||
newActiveClusterPhyId,
|
||||
resName,
|
||||
newStandbyClusterPhyId,
|
||||
resName,
|
||||
HaResTypeEnum.KAFKA_USER_AND_CLIENT.getCode(),
|
||||
HaStatusEnum.STABLE_CODE
|
||||
);
|
||||
|
||||
HaASRelationDO oldDO = resNameMap.remove(resName);
|
||||
if (oldDO != null) {
|
||||
newDO.setId(oldDO.getId());
|
||||
haASRelationService.updateById(newDO);
|
||||
} else {
|
||||
try {
|
||||
haASRelationService.addHAToDB(newDO);
|
||||
} catch (DuplicateKeyException dke) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,10 +22,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchSubJobDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.ha.job.HaJobDetailVO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.FutureUtil;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.*;
|
||||
import com.xiaojukeji.kafka.manager.service.biz.ha.HaAppManager;
|
||||
import com.xiaojukeji.kafka.manager.service.biz.ha.HaTopicManager;
|
||||
import com.xiaojukeji.kafka.manager.service.biz.job.HaASSwitchJobManager;
|
||||
@@ -95,19 +92,20 @@ public class HaASSwitchJobManagerImpl implements HaASSwitchJobManager {
|
||||
|
||||
LOGGER.info("method=createJob||activeClusterPhyId={}||switchTopics={}||operator={}", dto.getActiveClusterPhyId(), ConvertUtil.obj2Json(haTopicSetResult.getData()), operator);
|
||||
|
||||
// 2、查看是否将KafkaUser关联的Topic都涵盖了
|
||||
if (dto.getMustContainAllKafkaUserTopics() != null
|
||||
&& dto.getMustContainAllKafkaUserTopics()
|
||||
&& (dto.getAll() == null || !dto.getAll())
|
||||
&& !haAppManager.isContainAllRelateAppTopics(dto.getActiveClusterPhyId(), dto.getTopicNameList())) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FORBIDDEN, "存在KafkaUser关联的Topic未选中");
|
||||
}
|
||||
// // 2、查看是否将KafkaUser关联的Topic都涵盖了
|
||||
// if (dto.getMustContainAllKafkaUserTopics() != null
|
||||
// && dto.getMustContainAllKafkaUserTopics()
|
||||
// && (dto.getAll() == null || !dto.getAll())
|
||||
// && !haAppManager.isContainAllRelateAppTopics(dto.getActiveClusterPhyId(), dto.getTopicNameList())) {
|
||||
// return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FORBIDDEN, "存在KafkaUser关联的Topic未选中");
|
||||
// }
|
||||
|
||||
// 3、创建任务
|
||||
Result<Long> longResult = haASSwitchJobService.createJob(
|
||||
dto.getActiveClusterPhyId(),
|
||||
dto.getStandbyClusterPhyId(),
|
||||
new ArrayList<>(haTopicSetResult.getData()),
|
||||
dto.getKafkaUserAndClientIdList(),
|
||||
operator
|
||||
);
|
||||
if (longResult.failed()) {
|
||||
@@ -176,6 +174,7 @@ public class HaASSwitchJobManagerImpl implements HaASSwitchJobManager {
|
||||
jobDO.getActiveClusterPhyId(),
|
||||
jobDO.getStandbyClusterPhyId(),
|
||||
subJobDOList.stream().map(elem -> elem.getActiveResName()).collect(Collectors.toList()),
|
||||
jobDO.getExtendRawData(),
|
||||
focus,
|
||||
firstTriggerExecute,
|
||||
new JobLogDO(JobLogBizTypEnum.HA_SWITCH_JOB_LOG.getCode(), String.valueOf(jobId)),
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package com.xiaojukeji.kafka.manager.service.service.gateway;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author zhongyuankai
|
||||
@@ -21,6 +24,14 @@ public interface TopicConnectionService {
|
||||
Date startTime,
|
||||
Date endTime);
|
||||
|
||||
Result<Map<String/*KafkaUser*/, Set<String>/*ClientID*/>> getHaKafkaUserAndClientIdByTopicName(Long firstClusterId,
|
||||
Long secondClusterId,
|
||||
String topicName,
|
||||
Date startTime,
|
||||
Date endTime);
|
||||
|
||||
Set<String> getKafkaUserAndClientIdTopicNames(Set<Long> clusterIdSet, String kafkaUser, String clientId, Date startTime, Date endTime);
|
||||
|
||||
/**
|
||||
* 查询连接信息
|
||||
*/
|
||||
@@ -37,6 +48,8 @@ public interface TopicConnectionService {
|
||||
Date startTime,
|
||||
Date endTime);
|
||||
|
||||
Result<List<TopicConnectionDO>> getByClusterAndAppId(Long firstClusterId, Long secondClusterId, String appId, Date startTime, Date endTime);
|
||||
|
||||
/**
|
||||
* 判断topic是否存在连接
|
||||
*/
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
||||
@@ -67,6 +69,71 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
|
||||
return getByTopicName(clusterId, doList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Map<String, Set<String>>> getHaKafkaUserAndClientIdByTopicName(Long firstClusterId,
|
||||
Long secondClusterId,
|
||||
String topicName,
|
||||
Date startTime,
|
||||
Date endTime) {
|
||||
List<TopicConnectionDO> doList = new ArrayList<>();
|
||||
try {
|
||||
if (firstClusterId != null) {
|
||||
doList.addAll(topicConnectionDao.getByTopicName(firstClusterId, topicName, startTime, endTime));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("get topic connections failed, firstClusterId:{} topicName:{}.", firstClusterId, topicName, e);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
if (secondClusterId != null) {
|
||||
doList.addAll(topicConnectionDao.getByTopicName(secondClusterId, topicName, startTime, endTime));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("get topic connections failed, secondClusterId:{} topicName:{}.", secondClusterId, topicName, e);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage());
|
||||
}
|
||||
|
||||
if (ValidateUtils.isEmptyList(doList)) {
|
||||
return Result.buildSuc(new HashMap<>());
|
||||
}
|
||||
|
||||
Map<String, Set<String>> userAndClientMap = new HashMap<>();
|
||||
for (TopicConnectionDO po: doList) {
|
||||
if (!po.getClientId().startsWith("P#") && !po.getClientId().startsWith("C#")) {
|
||||
// 忽略非HA的clientId
|
||||
continue;
|
||||
}
|
||||
|
||||
userAndClientMap.putIfAbsent(po.getAppId(), new HashSet<>());
|
||||
userAndClientMap.get(po.getAppId()).add(po.getClientId());
|
||||
}
|
||||
|
||||
return Result.buildSuc(userAndClientMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getKafkaUserAndClientIdTopicNames(Set<Long> clusterIdSet, String kafkaUser, String clientId, Date startTime, Date endTime) {
|
||||
List<TopicConnectionDO> doList = null;
|
||||
try {
|
||||
doList = topicConnectionDao.getByAppId(kafkaUser, startTime, endTime);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("get topic connections failed, kafkaUser:{}.", kafkaUser, e);
|
||||
}
|
||||
|
||||
if (ValidateUtils.isEmptyList(doList)) {
|
||||
return new HashSet<>();
|
||||
}
|
||||
|
||||
return doList
|
||||
.stream()
|
||||
.filter(elem -> elem.getClientId().equals(clientId) && clusterIdSet.contains(elem.getClusterId()))
|
||||
.map(item -> item.getTopicName())
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TopicConnection> getByTopicName(Long clusterId,
|
||||
String topicName,
|
||||
@@ -102,6 +169,36 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
|
||||
return getByTopicName(null, doList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<List<TopicConnectionDO>> getByClusterAndAppId(Long firstClusterId, Long secondClusterId, String appId, Date startTime, Date endTime) {
|
||||
List<TopicConnectionDO> doList = new ArrayList<>();
|
||||
try {
|
||||
if (firstClusterId != null) {
|
||||
doList.addAll(topicConnectionDao.getByClusterAndAppId(firstClusterId, appId, startTime, endTime));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("get topic connections failed, firstClusterId:{} appId:{}.", firstClusterId, appId, e);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
if (secondClusterId != null) {
|
||||
doList.addAll(topicConnectionDao.getByClusterAndAppId(secondClusterId, appId, startTime, endTime));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("get topic connections failed, secondClusterId:{} appId:{}.", secondClusterId, appId, e);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage());
|
||||
}
|
||||
|
||||
if (ValidateUtils.isEmptyList(doList)) {
|
||||
return Result.buildSuc(new ArrayList<>());
|
||||
}
|
||||
|
||||
return Result.buildSuc(doList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isExistConnection(Long clusterId,
|
||||
String topicName,
|
||||
@@ -210,6 +307,10 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
|
||||
LOGGER.error("get hostname failed. ip:{}.", connectionDO.getIp(), e);
|
||||
}
|
||||
dto.setHostname(hostName.replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, ""));
|
||||
|
||||
dto.setClientId(connectionDO.getClientId());
|
||||
dto.setRealConnectTime(connectionDO.getRealConnectTime());
|
||||
dto.setCreateTime(connectionDO.getCreateTime().getTime());
|
||||
return dto;
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public interface HaASRelationService {
|
||||
Result<Void> replaceTopicRelationsToDB(Long standbyClusterPhyId, List<HaASRelationDO> topicRelationDOList);
|
||||
@@ -53,6 +55,8 @@ public interface HaASRelationService {
|
||||
*/
|
||||
List<HaASRelationDO> listAllHAFromDB(Long firstClusterPhyId, HaResTypeEnum resTypeEnum);
|
||||
|
||||
Map<String, Set<String>> listAllHAClient(Long firstClusterPhyId, Set<String> kafkaUserSet);
|
||||
|
||||
/**
|
||||
* 获取主备关系
|
||||
*/
|
||||
|
||||
@@ -4,6 +4,7 @@ package com.xiaojukeji.kafka.manager.service.service.ha;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.HaJobDetail;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.HaSubJobExtendData;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchSubJobDO;
|
||||
|
||||
@@ -14,7 +15,11 @@ public interface HaASSwitchJobService {
|
||||
/**
|
||||
* 创建任务
|
||||
*/
|
||||
Result<Long> createJob(Long activeClusterPhyId, Long standbyClusterPhyId, List<String> topicNameList, String operator);
|
||||
Result<Long> createJob(Long activeClusterPhyId,
|
||||
Long standbyClusterPhyId,
|
||||
List<String> topicNameList,
|
||||
List<KafkaUserAndClientDTO> kafkaUserAndClientList,
|
||||
String operator);
|
||||
|
||||
/**
|
||||
* 更新任务状态
|
||||
|
||||
@@ -6,6 +6,8 @@ import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaStatusEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.HAUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.Tuple;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.ha.HaASRelationDao;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
|
||||
@@ -14,9 +16,7 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -177,6 +177,34 @@ public class HaASRelationServiceImpl implements HaASRelationService {
|
||||
return doList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Set<String>> listAllHAClient(Long firstClusterPhyId, Set<String> kafkaUserSet) {
|
||||
LambdaQueryWrapper<HaASRelationDO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(HaASRelationDO::getResType, HaResTypeEnum.KAFKA_USER_AND_CLIENT.getCode());
|
||||
lambdaQueryWrapper.and(lambda ->
|
||||
lambda.eq(HaASRelationDO::getActiveClusterPhyId, firstClusterPhyId).or().eq(HaASRelationDO::getStandbyClusterPhyId, firstClusterPhyId)
|
||||
);
|
||||
|
||||
// 查询HA列表
|
||||
List<HaASRelationDO> doList = haASRelationDao.selectList(lambdaQueryWrapper);
|
||||
if (ValidateUtils.isNull(doList)) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
Map<String, Set<String>> haClientMap = new HashMap<>();
|
||||
doList.forEach(elem -> {
|
||||
Tuple<String, String> data = HAUtils.splitKafkaUserAndClient(elem.getActiveResName());
|
||||
if (data == null || !kafkaUserSet.contains(data.getV1())) {
|
||||
return;
|
||||
}
|
||||
|
||||
haClientMap.putIfAbsent(data.getV1(), new HashSet<>());
|
||||
haClientMap.get(data.getV1()).add(data.getV2());
|
||||
});
|
||||
|
||||
return haClientMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HaASRelationDO> listAllHAFromDB(Long firstClusterPhyId, Long secondClusterPhyId, HaResTypeEnum resTypeEnum) {
|
||||
// 查询HA列表
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.xiaojukeji.kafka.manager.common.bizenum.ha.job.HaJobStatusEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.*;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchSubJobDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
|
||||
@@ -35,10 +36,22 @@ public class HaASSwitchJobServiceImpl implements HaASSwitchJobService {
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public Result<Long> createJob(Long activeClusterPhyId, Long standbyClusterPhyId, List<String> topicNameList, String operator) {
|
||||
public Result<Long> createJob(Long activeClusterPhyId,
|
||||
Long standbyClusterPhyId,
|
||||
List<String> topicNameList,
|
||||
List<KafkaUserAndClientDTO> kafkaUserAndClientList,
|
||||
String operator) {
|
||||
try {
|
||||
// 父任务
|
||||
HaASSwitchJobDO jobDO = new HaASSwitchJobDO(activeClusterPhyId, standbyClusterPhyId, HaJobStatusEnum.RUNNING.getStatus(), operator);
|
||||
HaASSwitchJobDO jobDO = new HaASSwitchJobDO(
|
||||
activeClusterPhyId,
|
||||
standbyClusterPhyId,
|
||||
ValidateUtils.isEmptyList(kafkaUserAndClientList)? 0: 1,
|
||||
kafkaUserAndClientList,
|
||||
HaJobStatusEnum.RUNNING.getStatus(),
|
||||
operator
|
||||
);
|
||||
|
||||
haASSwitchJobDao.insert(jobDO);
|
||||
|
||||
// 子任务
|
||||
|
||||
@@ -6,10 +6,15 @@ import kafka.admin.AdminUtils;
|
||||
import kafka.admin.AdminUtils$;
|
||||
import kafka.server.ConfigType;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLDecoder;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
@@ -40,7 +45,7 @@ public class HaKafkaUserCommands {
|
||||
props.putAll(modifiedProps);
|
||||
|
||||
// 修改配置, 这里不使用changeUserOrUserClientIdConfig方法的原因是changeUserOrUserClientIdConfig这个方法会进行参数检查
|
||||
AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), kafkaUser, props);
|
||||
AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), sanitize(kafkaUser), props);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("method=changeHaUserConfig||zookeeper={}||kafkaUser={}||modifiedProps={}||errMsg=exception", zookeeper, kafkaUser, modifiedProps, e);
|
||||
return false;
|
||||
@@ -73,7 +78,7 @@ public class HaKafkaUserCommands {
|
||||
}
|
||||
|
||||
// 修改配置, 这里不使用changeUserOrUserClientIdConfig方法的原因是changeUserOrUserClientIdConfig这个方法会进行参数检查
|
||||
AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), kafkaUser, presentProps);
|
||||
AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), sanitize(kafkaUser), presentProps);
|
||||
|
||||
return true;
|
||||
}catch (Exception e){
|
||||
@@ -90,4 +95,37 @@ public class HaKafkaUserCommands {
|
||||
|
||||
private HaKafkaUserCommands() {
|
||||
}
|
||||
|
||||
private static String sanitize(String name) {
|
||||
String encoded = "";
|
||||
try {
|
||||
encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name());
|
||||
StringBuilder builder = new StringBuilder();
|
||||
for (int i = 0; i < encoded.length(); i++) {
|
||||
char c = encoded.charAt(i);
|
||||
if (c == '*') { // Metric ObjectName treats * as pattern
|
||||
builder.append("%2A");
|
||||
} else if (c == '+') { // Space URL-encoded as +, replace with percent encoding
|
||||
builder.append("%20");
|
||||
} else {
|
||||
builder.append(c);
|
||||
}
|
||||
}
|
||||
return builder.toString();
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new KafkaException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Desanitize name that was URL-encoded using {@link #sanitize(String)}. This
|
||||
* is used to obtain the desanitized version of node names in ZooKeeper.
|
||||
*/
|
||||
private static String desanitize(String name) {
|
||||
try {
|
||||
return URLDecoder.decode(name, StandardCharsets.UTF_8.name());
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new KafkaException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user