mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
合并master分支
This commit is contained in:
@@ -1,15 +1,13 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.acl;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
|
||||
import org.apache.kafka.common.acl.AclBinding;
|
||||
import org.apache.kafka.common.resource.ResourceType;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface KafkaAclService {
|
||||
Result<List<AclBinding>> getAclFromKafka(Long clusterPhyId);
|
||||
|
||||
public interface KafkaAclService extends MetaDataService<AclBinding> {
|
||||
List<KafkaAclPO> getKafkaAclFromDB(Long clusterPhyId);
|
||||
|
||||
Integer countKafkaAclFromDB(Long clusterPhyId);
|
||||
@@ -17,10 +15,5 @@ public interface KafkaAclService {
|
||||
Integer countResTypeAndDistinctFromDB(Long clusterPhyId, ResourceType resourceType);
|
||||
|
||||
Integer countKafkaUserAndDistinctFromDB(Long clusterPhyId);
|
||||
|
||||
List<KafkaAclPO> getKafkaResTypeAclFromDB(Long clusterPhyId, Integer resType);
|
||||
|
||||
List<KafkaAclPO> getTopicAclFromDB(Long clusterPhyId, String topicName);
|
||||
|
||||
List<KafkaAclPO> getGroupAclFromDB(Long clusterPhyId, String groupName);
|
||||
}
|
||||
|
||||
@@ -3,10 +3,6 @@ package com.xiaojukeji.know.streaming.km.core.service.acl;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.acl.ACLAtomParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
|
||||
import org.apache.kafka.common.resource.ResourceType;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
public interface OpKafkaAclService {
|
||||
/**
|
||||
@@ -19,14 +15,5 @@ public interface OpKafkaAclService {
|
||||
*/
|
||||
Result<Void> deleteKafkaAcl(ACLAtomParam aclAtomParam, String operator);
|
||||
|
||||
/**
|
||||
* 删除ACL
|
||||
*/
|
||||
Result<Void> deleteKafkaAclByResName(ResourceType resourceType, String resourceName, String operator);
|
||||
|
||||
Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO);
|
||||
|
||||
void batchUpdateAcls(Long clusterPhyId, List<KafkaAclPO> poList);
|
||||
|
||||
int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime);
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
|
||||
@@ -18,8 +19,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO;
|
||||
@@ -36,11 +35,13 @@ import org.apache.kafka.common.resource.ResourceType;
|
||||
import org.apache.kafka.common.security.auth.KafkaPrincipal;
|
||||
import org.apache.kafka.common.utils.SecurityUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import scala.jdk.javaapi.CollectionConverters;
|
||||
|
||||
@@ -77,18 +78,49 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<List<AclBinding>> getAclFromKafka(Long clusterPhyId) {
|
||||
if (LoadedClusterPhyCache.getByPhyId(clusterPhyId) == null) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
|
||||
}
|
||||
|
||||
public Result<List<AclBinding>> getDataFromKafka(ClusterPhy clusterPhy) {
|
||||
try {
|
||||
return (Result<List<AclBinding>>) versionControlService.doHandler(getVersionItemType(), getMethodName(clusterPhyId, ACL_GET_FROM_KAFKA), new ClusterPhyParam(clusterPhyId));
|
||||
Result<List<AclBinding>> dataResult = (Result<List<AclBinding>>) versionControlService.doHandler(getVersionItemType(), getMethodName(clusterPhy.getId(), ACL_GET_FROM_KAFKA), new ClusterPhyParam(clusterPhy.getId()));
|
||||
if (dataResult.failed()) {
|
||||
Result.buildFromIgnoreData(dataResult);
|
||||
}
|
||||
|
||||
return Result.buildSuc(dataResult.getData());
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(e.getResultStatus());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeToDB(Long clusterPhyId, List<AclBinding> dataList) {
|
||||
Map<String, KafkaAclPO> dbPOMap = this.getKafkaAclFromDB(clusterPhyId).stream().collect(Collectors.toMap(KafkaAclPO::getUniqueField, Function.identity()));
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
for (AclBinding aclBinding: dataList) {
|
||||
KafkaAclPO newPO = KafkaAclConverter.convert2KafkaAclPO(clusterPhyId, aclBinding, now);
|
||||
KafkaAclPO oldPO = dbPOMap.remove(newPO.getUniqueField());
|
||||
if (oldPO == null) {
|
||||
// 新增的ACL
|
||||
this.insertAndIgnoreDuplicate(newPO);
|
||||
}
|
||||
|
||||
// 不需要update
|
||||
}
|
||||
|
||||
// 删除已经不存在的
|
||||
for (KafkaAclPO dbPO: dbPOMap.values()) {
|
||||
kafkaAclDAO.deleteById(dbPO);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteInDBByKafkaClusterId(Long clusterPhyId) {
|
||||
LambdaQueryWrapper<KafkaAclPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
|
||||
|
||||
return kafkaAclDAO.delete(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<KafkaAclPO> getKafkaAclFromDB(Long clusterPhyId) {
|
||||
LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>();
|
||||
@@ -116,7 +148,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (int)poList.stream().map(elem -> elem.getResourceName()).distinct().count();
|
||||
return (int)poList.stream().map(KafkaAclPO::getResourceName).distinct().count();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -130,15 +162,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (int)poList.stream().map(elem -> elem.getPrincipal()).distinct().count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<KafkaAclPO> getKafkaResTypeAclFromDB(Long clusterPhyId, Integer resType) {
|
||||
LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>();
|
||||
queryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
|
||||
queryWrapper.eq(KafkaAclPO::getResourceType, resType);
|
||||
return kafkaAclDAO.selectList(queryWrapper);
|
||||
return (int)poList.stream().map(KafkaAclPO::getPrincipal).distinct().count();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -152,15 +176,6 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
|
||||
return kafkaAclDAO.selectList(queryWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<KafkaAclPO> getGroupAclFromDB(Long clusterPhyId, String groupName) {
|
||||
LambdaQueryWrapper<KafkaAclPO> queryWrapper = new LambdaQueryWrapper<>();
|
||||
queryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
|
||||
queryWrapper.eq(KafkaAclPO::getResourceType, ResourceType.GROUP.code());
|
||||
queryWrapper.eq(KafkaAclPO::getResourceName, groupName);
|
||||
return kafkaAclDAO.selectList(queryWrapper);
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private Result<List<AclBinding>> getAclByZKClient(VersionItemParam itemParam){
|
||||
@@ -170,7 +185,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
|
||||
for (ZkAclStore store: CollectionConverters.asJava(ZkAclStore.stores())) {
|
||||
Result<List<AclBinding>> rl = this.getSpecifiedTypeAclByZKClient(param.getClusterPhyId(), store.patternType());
|
||||
if (rl.failed()) {
|
||||
return rl;
|
||||
return Result.buildFromIgnoreData(rl);
|
||||
}
|
||||
|
||||
aclList.addAll(rl.getData());
|
||||
@@ -229,4 +244,19 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen
|
||||
|
||||
return Result.buildSuc(kafkaAclList);
|
||||
}
|
||||
|
||||
private Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) {
|
||||
try {
|
||||
kafkaAclDAO.insert(kafkaAclPO);
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (DuplicateKeyException dke) {
|
||||
// 直接写入,如果出现key冲突则直接忽略,因为key冲突时,表示该数据已完整存在,不需要替换任何数据
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
log.error("method=insertAndIgnoreDuplicate||kafkaAclPO={}||errMsg=exception", kafkaAclPO, e);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistExcept
|
||||
import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO;
|
||||
@@ -32,7 +31,6 @@ import org.apache.kafka.clients.admin.*;
|
||||
import org.apache.kafka.common.acl.*;
|
||||
import org.apache.kafka.common.resource.ResourcePattern;
|
||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||
import org.apache.kafka.common.resource.ResourceType;
|
||||
import org.apache.kafka.common.security.auth.KafkaPrincipal;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
@@ -41,8 +39,6 @@ import scala.jdk.javaapi.CollectionConverters;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
|
||||
|
||||
@@ -169,11 +165,6 @@ public class OpKafkaAclServiceImpl extends BaseKafkaVersionControlService implem
|
||||
return rv;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> deleteKafkaAclByResName(ResourceType resourceType, String resourceName, String operator) {
|
||||
return Result.buildSuc();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) {
|
||||
try {
|
||||
@@ -190,34 +181,6 @@ public class OpKafkaAclServiceImpl extends BaseKafkaVersionControlService implem
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batchUpdateAcls(Long clusterPhyId, List<KafkaAclPO> poList) {
|
||||
LambdaQueryWrapper<KafkaAclPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
|
||||
|
||||
Map<String, KafkaAclPO> dbPOMap = kafkaAclDAO.selectList(lambdaQueryWrapper).stream().collect(Collectors.toMap(KafkaAclPO::getUniqueField, Function.identity()));
|
||||
for (KafkaAclPO po: poList) {
|
||||
KafkaAclPO dbPO = dbPOMap.remove(po.getUniqueField());
|
||||
if (dbPO == null) {
|
||||
// 新增的ACL
|
||||
this.insertAndIgnoreDuplicate(po);
|
||||
}
|
||||
}
|
||||
|
||||
// 删除已经不存在的
|
||||
for (KafkaAclPO dbPO: dbPOMap.values()) {
|
||||
kafkaAclDAO.deleteById(dbPO);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) {
|
||||
LambdaQueryWrapper<KafkaAclPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId);
|
||||
lambdaQueryWrapper.le(KafkaAclPO::getUpdateTime, beforeTime);
|
||||
return kafkaAclDAO.delete(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private Result<Void> deleteInDB(KafkaAclPO kafkaAclPO) {
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect.ClusterPhyDeletedEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
@@ -146,6 +147,9 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
|
||||
String.format("删除集群:%s",clusterPhy.toString()));
|
||||
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
||||
|
||||
// 发布删除集群事件
|
||||
SpringTool.publish(new ClusterPhyDeletedEvent(this, clusterPhyId));
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
log.error("method=removeClusterPhyById||clusterPhyId={}||operator={}||msg=remove cluster failed||errMsg=exception!",
|
||||
|
||||
@@ -4,14 +4,16 @@ package com.xiaojukeji.know.streaming.km.core.service.connect.cluster;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.cluster.ConnectClusterDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectClusterMetadata;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Connect-Cluster
|
||||
*/
|
||||
public interface ConnectClusterService {
|
||||
public interface ConnectClusterService extends MetaDataService<KSGroupDescription> {
|
||||
Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata);
|
||||
|
||||
List<ConnectCluster> listByKafkaCluster(Long kafkaClusterPhyId);
|
||||
|
||||
@@ -24,9 +24,9 @@ import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.ConnectClusterMetricESDAO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.cluster.ConnectClusterMetricESDAO;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
@@ -43,7 +43,7 @@ import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultS
|
||||
* @author didi
|
||||
*/
|
||||
@Service
|
||||
public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService implements ConnectClusterMetricService {
|
||||
public class ConnectClusterMetricServiceImpl extends BaseConnectMetricService implements ConnectClusterMetricService {
|
||||
protected static final ILog LOGGER = LogFactory.getLog(ConnectClusterMetricServiceImpl.class);
|
||||
|
||||
public static final String CONNECT_CLUSTER_METHOD_GET_WORKER_METRIC_AVG = "getWorkerMetricAvg";
|
||||
@@ -86,8 +86,7 @@ public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService
|
||||
String connectClusterMetricKey = CollectedMetricsLocalCache.genConnectClusterMetricCacheKey(connectClusterPhyId, metric);
|
||||
Float keyValue = CollectedMetricsLocalCache.getConnectClusterMetrics(connectClusterMetricKey);
|
||||
if (keyValue != null) {
|
||||
ConnectClusterMetrics connectClusterMetrics = ConnectClusterMetrics.initWithMetric(connectClusterPhyId,metric,keyValue);
|
||||
return Result.buildSuc(connectClusterMetrics);
|
||||
return Result.buildSuc(new ConnectClusterMetrics(connectClusterPhyId, metric, keyValue));
|
||||
}
|
||||
|
||||
Result<ConnectClusterMetrics> ret = this.collectConnectClusterMetricsFromKafka(connectClusterPhyId, metric);
|
||||
@@ -209,8 +208,7 @@ public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService
|
||||
try {
|
||||
//2、获取jmx指标
|
||||
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString();
|
||||
ConnectWorkerMetrics connectWorkerMetrics = ConnectWorkerMetrics.initWithMetric(connectClusterId, workerId, metric, Float.valueOf(value));
|
||||
return Result.buildSuc(connectWorkerMetrics);
|
||||
return Result.buildSuc(new ConnectWorkerMetrics(connectClusterId, workerId, metric, Float.valueOf(value)));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("method=getConnectWorkerMetricsByJMX||connectClusterId={}||workerId={}||metrics={}||jmx={}||msg={}",
|
||||
connectClusterId, workerId, metric, jmxInfo.getJmxObjectName(), e.getClass().getName());
|
||||
@@ -231,8 +229,8 @@ public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
protected List<MetricMultiLinesVO> metricMap2VO(Long connectClusterId,
|
||||
Map<String/*metric*/, Map<Long, List<MetricPointVO>>> map){
|
||||
private List<MetricMultiLinesVO> metricMap2VO(Long connectClusterId,
|
||||
Map<String/*metric*/, Map<Long, List<MetricPointVO>>> map){
|
||||
List<MetricMultiLinesVO> multiLinesVOS = new ArrayList<>();
|
||||
if (map == null || map.isEmpty()) {
|
||||
// 如果为空,则直接返回
|
||||
|
||||
@@ -38,6 +38,14 @@ public class ConnectClusterServiceImpl implements ConnectClusterService {
|
||||
@Autowired
|
||||
private OpLogWrapService opLogWrapService;
|
||||
|
||||
@Override
|
||||
public int deleteInDBByKafkaClusterId(Long clusterPhyId) {
|
||||
LambdaQueryWrapper<ConnectClusterPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(ConnectClusterPO::getKafkaClusterPhyId, clusterPhyId);
|
||||
|
||||
return connectClusterDAO.deleteById(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) {
|
||||
ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName());
|
||||
|
||||
@@ -4,49 +4,30 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluste
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 查看Connector
|
||||
*/
|
||||
public interface ConnectorService {
|
||||
Result<KSConnectorInfo> createConnector(Long connectClusterId, String connectorName, Properties configs, String operator);
|
||||
|
||||
public interface ConnectorService extends MetaDataService<KSConnector> {
|
||||
/**
|
||||
* 获取所有的连接器名称列表
|
||||
*/
|
||||
Result<List<String>> listConnectorsFromCluster(Long connectClusterId);
|
||||
Result<List<String>> listConnectorsFromCluster(ConnectCluster connectCluster);
|
||||
|
||||
/**
|
||||
* 获取单个连接器信息
|
||||
*/
|
||||
Result<KSConnectorInfo> getConnectorInfoFromCluster(Long connectClusterId, String connectorName);
|
||||
|
||||
Result<List<String>> getConnectorTopicsFromCluster(Long connectClusterId, String connectorName);
|
||||
|
||||
Result<KSConnectorStateInfo> getConnectorStateInfoFromCluster(Long connectClusterId, String connectorName);
|
||||
|
||||
Result<KSConnector> getAllConnectorInfoFromCluster(Long connectClusterId, String connectorName);
|
||||
|
||||
Result<Void> resumeConnector(Long connectClusterId, String connectorName, String operator);
|
||||
|
||||
Result<Void> restartConnector(Long connectClusterId, String connectorName, String operator);
|
||||
|
||||
Result<Void> stopConnector(Long connectClusterId, String connectorName, String operator);
|
||||
|
||||
Result<Void> deleteConnector(Long connectClusterId, String connectorName, String operator);
|
||||
|
||||
Result<Void> updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator);
|
||||
|
||||
void batchReplace(Long kafkaClusterPhyId, Long connectClusterId, List<KSConnector> connectorList, Set<String> allConnectorNameSet);
|
||||
|
||||
void addNewToDB(KSConnector connector);
|
||||
Result<KSConnector> getConnectorFromKafka(Long connectClusterId, String connectorName);
|
||||
|
||||
List<ConnectorPO> listByKafkaClusterIdFromDB(Long kafkaClusterPhyId);
|
||||
|
||||
@@ -57,6 +38,4 @@ public interface ConnectorService {
|
||||
ConnectorPO getConnectorFromDB(Long connectClusterId, String connectorName);
|
||||
|
||||
ConnectorTypeEnum getConnectorType(Long connectClusterId, String connectorName);
|
||||
|
||||
void completeMirrorMakerInfo(ConnectCluster connectCluster, List<KSConnector> connectorList);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.connect.connector;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 查看Connector
|
||||
*/
|
||||
public interface OpConnectorService {
|
||||
Result<KSConnectorInfo> createConnector(Long connectClusterId, String connectorName, Properties configs, String operator);
|
||||
|
||||
Result<Void> resumeConnector(Long connectClusterId, String connectorName, String operator);
|
||||
|
||||
Result<Void> restartConnector(Long connectClusterId, String connectorName, String operator);
|
||||
|
||||
Result<Void> stopConnector(Long connectClusterId, String connectorName, String operator);
|
||||
|
||||
Result<Void> deleteConnector(Long connectClusterId, String connectorName, String operator);
|
||||
|
||||
Result<Void> updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator);
|
||||
|
||||
void addNewToDB(KSConnector connector);
|
||||
}
|
||||
@@ -18,6 +18,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect.Connector
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectStatusEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
|
||||
@@ -32,7 +33,7 @@ import com.xiaojukeji.know.streaming.km.core.service.connect.connector.Connector
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.connector.ConnectorMetricESDAO;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -52,7 +53,7 @@ import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultS
|
||||
* @author didi
|
||||
*/
|
||||
@Service
|
||||
public class ConnectorMetricServiceImpl extends BaseConnectorMetricService implements ConnectorMetricService {
|
||||
public class ConnectorMetricServiceImpl extends BaseConnectMetricService implements ConnectorMetricService {
|
||||
protected static final ILog LOGGER = LogFactory.getLog(ConnectorMetricServiceImpl.class);
|
||||
|
||||
public static final String CONNECTOR_METHOD_DO_NOTHING = "doNothing";
|
||||
@@ -67,6 +68,8 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
|
||||
public static final String CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE = "getMetricHealthScore";
|
||||
|
||||
public static final String CONNECTOR_METHOD_GET_METRIC_RUNNING_STATUS = "getMetricRunningStatus";
|
||||
|
||||
@Autowired
|
||||
private ConnectorMetricESDAO connectorMetricESDAO;
|
||||
|
||||
@@ -98,11 +101,12 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
@Override
|
||||
protected void initRegisterVCHandler() {
|
||||
registerVCHandler(CONNECTOR_METHOD_DO_NOTHING, this::doNothing);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_CONNECT_WORKER_METRIC_SUM, this::getConnectWorkerMetricSum);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_AVG, this::getConnectorTaskMetricsAvg);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_MAX, this::getConnectorTaskMetricsMax);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_SUM, this::getConnectorTaskMetricsSum);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE, this::getMetricHealthScore);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_CONNECT_WORKER_METRIC_SUM, this::getConnectWorkerMetricSum);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_AVG, this::getConnectorTaskMetricsAvg);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_MAX, this::getConnectorTaskMetricsMax);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_CONNECTOR_TASK_METRICS_SUM, this::getConnectorTaskMetricsSum);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE, this::getMetricHealthScore);
|
||||
registerVCHandler(CONNECTOR_METHOD_GET_METRIC_RUNNING_STATUS, this::getMetricRunningStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -111,8 +115,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
Float keyValue = CollectedMetricsLocalCache.getConnectorMetrics(connectorMetricKey);
|
||||
|
||||
if (null != keyValue) {
|
||||
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterPhyId, connectorName, metric, keyValue);
|
||||
return Result.buildSuc(connectorMetrics);
|
||||
return Result.buildSuc(new ConnectorMetrics(connectClusterPhyId, connectorName, metric, keyValue));
|
||||
}
|
||||
|
||||
Result<ConnectorMetrics> ret = this.collectConnectClusterMetricsFromKafka(connectClusterPhyId, connectorName, metric);
|
||||
@@ -216,6 +219,20 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
return Result.buildSuc(metrics);
|
||||
}
|
||||
|
||||
private Result<ConnectorMetrics> getMetricRunningStatus(VersionItemParam metricParam) {
|
||||
ConnectorMetricParam param = (ConnectorMetricParam) metricParam;
|
||||
Long connectClusterId = param.getConnectClusterId();
|
||||
String connectorName = param.getConnectorName();
|
||||
String metricName = param.getMetricName();
|
||||
|
||||
ConnectorPO connector = connectorService.getConnectorFromDB(connectClusterId, connectorName);
|
||||
if (connector == null) {
|
||||
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metricName, (float)ConnectStatusEnum.UNKNOWN.getStatus()));
|
||||
}
|
||||
|
||||
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metricName, (float)ConnectStatusEnum.getByValue(connector.getState()).getStatus()));
|
||||
}
|
||||
|
||||
private Result<ConnectorMetrics> getConnectWorkerMetricSum(VersionItemParam metricParam) {
|
||||
ConnectorMetricParam param = (ConnectorMetricParam) metricParam;
|
||||
Long connectClusterId = param.getConnectClusterId();
|
||||
@@ -240,12 +257,16 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
if (!isCollected) {
|
||||
return Result.buildFailure(NOT_EXIST);
|
||||
}
|
||||
return Result.buildSuc(ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, sum));
|
||||
|
||||
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, sum));
|
||||
}
|
||||
|
||||
//kafka.connect:type=connect-worker-metrics,connector="{connector}" 指标
|
||||
private Result<ConnectorMetrics> getConnectorMetric(Long connectClusterId, String workerId, String connectorName, String metric, ConnectorTypeEnum connectorType) {
|
||||
VersionConnectJmxInfo jmxInfo = getJMXInfo(connectClusterId, metric);
|
||||
if (null == jmxInfo) {
|
||||
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
|
||||
}
|
||||
|
||||
if (jmxInfo.getType() != null) {
|
||||
if (connectorType == null) {
|
||||
@@ -257,9 +278,6 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
}
|
||||
}
|
||||
|
||||
if (null == jmxInfo) {
|
||||
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
|
||||
}
|
||||
String jmxObjectName = String.format(jmxInfo.getJmxObjectName(), connectorName);
|
||||
|
||||
JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, workerId);
|
||||
@@ -270,8 +288,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
try {
|
||||
//2、获取jmx指标
|
||||
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxObjectName), jmxInfo.getJmxAttribute()).toString();
|
||||
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, Float.valueOf(value));
|
||||
return Result.buildSuc(connectorMetrics);
|
||||
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, Float.valueOf(value)));
|
||||
} catch (InstanceNotFoundException e) {
|
||||
// 忽略该错误,该错误出现的原因是该指标在JMX中不存在
|
||||
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName));
|
||||
@@ -296,8 +313,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
}
|
||||
|
||||
Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get();
|
||||
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, sum / ret.getData().size());
|
||||
return Result.buildSuc(connectorMetrics);
|
||||
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, sum / ret.getData().size()));
|
||||
}
|
||||
|
||||
private Result<ConnectorMetrics> getConnectorTaskMetricsMax(VersionItemParam metricParam){
|
||||
@@ -313,8 +329,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
}
|
||||
|
||||
Float max = ret.getData().stream().max((a, b) -> a.getMetric(metric).compareTo(b.getMetric(metric))).get().getMetric(metric);
|
||||
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, max);
|
||||
return Result.buildSuc(connectorMetrics);
|
||||
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, max));
|
||||
}
|
||||
|
||||
private Result<ConnectorMetrics> getConnectorTaskMetricsSum(VersionItemParam metricParam){
|
||||
@@ -330,8 +345,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
}
|
||||
|
||||
Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get();
|
||||
ConnectorMetrics connectorMetrics = ConnectorMetrics.initWithMetric(connectClusterId, connectorName, metric, sum);
|
||||
return Result.buildSuc(connectorMetrics);
|
||||
return Result.buildSuc(new ConnectorMetrics(connectClusterId, connectorName, metric, sum));
|
||||
}
|
||||
|
||||
|
||||
@@ -358,6 +372,9 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
|
||||
private Result<ConnectorTaskMetrics> getConnectorTaskMetric(Long connectClusterId, String workerId, String connectorName, Integer taskId, String metric, ConnectorTypeEnum connectorType) {
|
||||
VersionConnectJmxInfo jmxInfo = getJMXInfo(connectClusterId, metric);
|
||||
if (null == jmxInfo) {
|
||||
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
|
||||
}
|
||||
|
||||
if (jmxInfo.getType() != null) {
|
||||
if (connectorType == null) {
|
||||
@@ -369,9 +386,6 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
}
|
||||
}
|
||||
|
||||
if (null == jmxInfo) {
|
||||
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
|
||||
}
|
||||
String jmxObjectName=String.format(jmxInfo.getJmxObjectName(), connectorName, taskId);
|
||||
|
||||
JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, workerId);
|
||||
@@ -382,8 +396,7 @@ public class ConnectorMetricServiceImpl extends BaseConnectorMetricService imple
|
||||
try {
|
||||
//2、获取jmx指标
|
||||
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxObjectName), jmxInfo.getJmxAttribute()).toString();
|
||||
ConnectorTaskMetrics connectorTaskMetrics = ConnectorTaskMetrics.initWithMetric(connectClusterId, connectorName, taskId, metric, Float.valueOf(value));
|
||||
return Result.buildSuc(connectorTaskMetrics);
|
||||
return Result.buildSuc(new ConnectorTaskMetrics(connectClusterId, connectorName, taskId, metric, Float.valueOf(value)));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("method=getConnectorTaskMetric||connectClusterId={}||workerId={}||connectorName={}||taskId={}||metrics={}||jmx={}||msg={}",
|
||||
connectClusterId, workerId, connectorName, taskId, metric, jmxObjectName, e.getClass().getName());
|
||||
|
||||
@@ -3,7 +3,6 @@ package com.xiaojukeji.know.streaming.km.core.service.connect.connector.impl;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
|
||||
@@ -13,19 +12,14 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.component.RestTool;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.connect.ConnectorDAO;
|
||||
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
|
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
|
||||
@@ -34,14 +28,9 @@ import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_CONNECT_CONNECTOR;
|
||||
|
||||
@Service
|
||||
public class ConnectorServiceImpl extends BaseVersionControlService implements ConnectorService {
|
||||
public class ConnectorServiceImpl implements ConnectorService {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ConnectorServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
@@ -53,79 +42,14 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
|
||||
@Autowired
|
||||
private ConnectClusterService connectClusterService;
|
||||
|
||||
@Autowired
|
||||
private OpLogWrapService opLogWrapService;
|
||||
|
||||
private static final String LIST_CONNECTORS_URI = "/connectors";
|
||||
private static final String GET_CONNECTOR_INFO_PREFIX_URI = "/connectors";
|
||||
private static final String GET_CONNECTOR_TOPICS_URI = "/connectors/%s/topics";
|
||||
private static final String GET_CONNECTOR_STATUS_URI = "/connectors/%s/status";
|
||||
|
||||
private static final String CREATE_CONNECTOR_URI = "/connectors";
|
||||
private static final String RESUME_CONNECTOR_URI = "/connectors/%s/resume";
|
||||
private static final String RESTART_CONNECTOR_URI = "/connectors/%s/restart";
|
||||
private static final String PAUSE_CONNECTOR_URI = "/connectors/%s/pause";
|
||||
private static final String DELETE_CONNECTOR_URI = "/connectors/%s";
|
||||
private static final String UPDATE_CONNECTOR_CONFIG_URI = "/connectors/%s/config";
|
||||
|
||||
@Override
|
||||
protected VersionItemTypeEnum getVersionItemType() {
|
||||
return SERVICE_OP_CONNECT_CONNECTOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<KSConnectorInfo> createConnector(Long connectClusterId, String connectorName, Properties configs, String operator) {
|
||||
public Result<List<String>> listConnectorsFromCluster(ConnectCluster connectCluster) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
// 构造参数
|
||||
Properties props = new Properties();
|
||||
props.put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, connectorName);
|
||||
props.put("config", configs);
|
||||
|
||||
ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent(
|
||||
connectCluster.getSuitableRequestUrl() + CREATE_CONNECTOR_URI,
|
||||
props,
|
||||
ConnectorInfo.class
|
||||
);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.ADD.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
ConvertUtil.obj2Json(configs)
|
||||
));
|
||||
|
||||
KSConnectorInfo connector = new KSConnectorInfo();
|
||||
connector.setConnectClusterId(connectClusterId);
|
||||
connector.setConfig(connectorInfo.config());
|
||||
connector.setName(connectorInfo.name());
|
||||
connector.setTasks(connectorInfo.tasks());
|
||||
connector.setType(connectorInfo.type());
|
||||
|
||||
return Result.buildSuc(connector);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=createConnector||connectClusterId={}||connectorName={}||configs={}||operator={}||errMsg=exception",
|
||||
connectClusterId, connectorName, configs, operator, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<List<String>> listConnectorsFromCluster(Long connectClusterId) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
List<String> nameList = restTool.getArrayObjectWithJsonContent(
|
||||
connectCluster.getSuitableRequestUrl() + LIST_CONNECTORS_URI,
|
||||
new HashMap<>(),
|
||||
@@ -135,8 +59,8 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
|
||||
return Result.buildSuc(nameList);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=listConnectorsFromCluster||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
"method=listConnectorsFromCluster||connectClusterId={}||connectClusterSuitableUrl={}||errMsg=exception",
|
||||
connectCluster.getId(), connectCluster.getSuitableRequestUrl(), e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
@@ -153,16 +77,6 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
|
||||
return this.getConnectorInfoFromCluster(connectCluster, connectorName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<List<String>> getConnectorTopicsFromCluster(Long connectClusterId, String connectorName) {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
return this.getConnectorTopicsFromCluster(connectCluster, connectorName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<KSConnectorStateInfo> getConnectorStateInfoFromCluster(Long connectClusterId, String connectorName) {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
@@ -174,270 +88,26 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<KSConnector> getAllConnectorInfoFromCluster(Long connectClusterId, String connectorName) {
|
||||
public Result<KSConnector> getConnectorFromKafka(Long connectClusterId, String connectorName) {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
Result<KSConnectorInfo> connectorResult = this.getConnectorInfoFromCluster(connectCluster, connectorName);
|
||||
if (connectorResult.failed()) {
|
||||
LOGGER.error(
|
||||
"method=getAllConnectorInfoFromCluster||connectClusterId={}||connectorName={}||result={}",
|
||||
connectClusterId, connectorName, connectorResult
|
||||
);
|
||||
|
||||
return Result.buildFromIgnoreData(connectorResult);
|
||||
}
|
||||
|
||||
Result<List<String>> topicNameListResult = this.getConnectorTopicsFromCluster(connectCluster, connectorName);
|
||||
if (topicNameListResult.failed()) {
|
||||
LOGGER.error(
|
||||
"method=getAllConnectorInfoFromCluster||connectClusterId={}||connectorName={}||result={}",
|
||||
connectClusterId, connectorName, connectorResult
|
||||
);
|
||||
}
|
||||
|
||||
Result<KSConnectorStateInfo> stateInfoResult = this.getConnectorStateInfoFromCluster(connectCluster, connectorName);
|
||||
if (stateInfoResult.failed()) {
|
||||
LOGGER.error(
|
||||
"method=getAllConnectorInfoFromCluster||connectClusterId={}||connectorName={}||result={}",
|
||||
connectClusterId, connectorName, connectorResult
|
||||
);
|
||||
Result<Triple<KSConnectorInfo, List<String>, KSConnectorStateInfo>> fullInfoResult = this.getConnectorFullInfoFromKafka(connectCluster, connectorName);
|
||||
if (fullInfoResult.failed()) {
|
||||
return Result.buildFromIgnoreData(fullInfoResult);
|
||||
}
|
||||
|
||||
return Result.buildSuc(ConnectConverter.convert2KSConnector(
|
||||
connectCluster.getKafkaClusterPhyId(),
|
||||
connectCluster.getId(),
|
||||
connectorResult.getData(),
|
||||
stateInfoResult.getData(),
|
||||
topicNameListResult.getData()
|
||||
fullInfoResult.getData().v1(),
|
||||
fullInfoResult.getData().v3(),
|
||||
fullInfoResult.getData().v2()
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> resumeConnector(Long connectClusterId, String connectorName, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
restTool.putJsonForObject(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(RESUME_CONNECTOR_URI, connectorName),
|
||||
new HashMap<>(),
|
||||
String.class
|
||||
);
|
||||
|
||||
this.updateStatus(connectCluster, connectClusterId, connectorName);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.ENABLE.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
""
|
||||
));
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"class=ConnectorServiceImpl||method=resumeConnector||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> restartConnector(Long connectClusterId, String connectorName, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
restTool.postObjectWithJsonContent(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(RESTART_CONNECTOR_URI, connectorName),
|
||||
new HashMap<>(),
|
||||
String.class
|
||||
);
|
||||
|
||||
this.updateStatus(connectCluster, connectClusterId, connectorName);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.RESTART.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
""
|
||||
));
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=restartConnector||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> stopConnector(Long connectClusterId, String connectorName, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
restTool.putJsonForObject(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName),
|
||||
new HashMap<>(),
|
||||
String.class
|
||||
);
|
||||
|
||||
this.updateStatus(connectCluster, connectClusterId, connectorName);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.DISABLE.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
""
|
||||
));
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=stopConnector||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> deleteConnector(Long connectClusterId, String connectorName, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
restTool.deleteWithParamsAndHeader(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(DELETE_CONNECTOR_URI, connectorName),
|
||||
new HashMap<>(),
|
||||
new HashMap<>(),
|
||||
String.class
|
||||
);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.DELETE.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
""
|
||||
));
|
||||
|
||||
this.deleteConnectorInDB(connectClusterId, connectorName);
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=deleteConnector||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
ConnectorInfo connectorInfo = restTool.putJsonForObject(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName),
|
||||
configs,
|
||||
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo.class
|
||||
);
|
||||
|
||||
this.updateStatus(connectCluster, connectClusterId, connectorName);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.EDIT.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
ConvertUtil.obj2Json(configs)
|
||||
));
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=updateConnectorConfig||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batchReplace(Long kafkaClusterPhyId, Long connectClusterId, List<KSConnector> connectorList, Set<String> allConnectorNameSet) {
|
||||
List<ConnectorPO> poList = this.listByConnectClusterIdFromDB(connectClusterId);
|
||||
|
||||
Map<String, ConnectorPO> oldPOMap = new HashMap<>();
|
||||
poList.forEach(elem -> oldPOMap.put(elem.getConnectorName(), elem));
|
||||
|
||||
for (KSConnector connector: connectorList) {
|
||||
try {
|
||||
ConnectorPO oldPO = oldPOMap.remove(connector.getConnectorName());
|
||||
if (oldPO == null) {
|
||||
oldPO = ConvertUtil.obj2Obj(connector, ConnectorPO.class);
|
||||
connectorDAO.insert(oldPO);
|
||||
} else {
|
||||
ConnectorPO newPO = ConvertUtil.obj2Obj(connector, ConnectorPO.class);
|
||||
newPO.setId(oldPO.getId());
|
||||
connectorDAO.updateById(newPO);
|
||||
}
|
||||
} catch (DuplicateKeyException dke) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
oldPOMap.values().forEach(elem -> {
|
||||
if (allConnectorNameSet.contains(elem.getConnectorName())) {
|
||||
// 当前connector还存在
|
||||
return;
|
||||
}
|
||||
|
||||
// 当前connector不存在了,则进行删除
|
||||
connectorDAO.deleteById(elem.getId());
|
||||
});
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addNewToDB(KSConnector connector) {
|
||||
try {
|
||||
connectorDAO.insert(ConvertUtil.obj2Obj(connector, ConnectorPO.class));
|
||||
} catch (DuplicateKeyException dke) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ConnectorPO> listByKafkaClusterIdFromDB(Long kafkaClusterPhyId) {
|
||||
LambdaQueryWrapper<ConnectorPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
@@ -482,53 +152,98 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeMirrorMakerInfo(ConnectCluster connectCluster, List<KSConnector> connectorList) {
|
||||
List<KSConnector> sourceConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList());
|
||||
if (sourceConnectorList.isEmpty()) {
|
||||
return;
|
||||
public Result<Tuple<Set<String>, List<KSConnector>>> getDataFromKafka(ConnectCluster connectCluster) {
|
||||
Result<List<String>> nameListResult = this.listConnectorsFromCluster(connectCluster);
|
||||
if (nameListResult.failed()) {
|
||||
return Result.buildFromIgnoreData(nameListResult);
|
||||
}
|
||||
|
||||
List<KSConnector> heartBeatConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE)).collect(Collectors.toList());
|
||||
List<KSConnector> checkpointConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE)).collect(Collectors.toList());
|
||||
|
||||
Map<String, String> heartbeatMap = this.buildMirrorMakerMap(connectCluster, heartBeatConnectorList);
|
||||
Map<String, String> checkpointMap = this.buildMirrorMakerMap(connectCluster, checkpointConnectorList);
|
||||
|
||||
for (KSConnector sourceConnector : sourceConnectorList) {
|
||||
Result<KSConnectorInfo> ret = this.getConnectorInfoFromCluster(connectCluster, sourceConnector.getConnectorName());
|
||||
|
||||
if (!ret.hasData()) {
|
||||
LOGGER.error(
|
||||
"method=completeMirrorMakerInfo||connectClusterId={}||connectorName={}||get connectorInfo fail!",
|
||||
connectCluster.getId(), sourceConnector.getConnectorName()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
KSConnectorInfo ksConnectorInfo = ret.getData();
|
||||
String targetServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
|
||||
String sourceServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
|
||||
|
||||
if (ValidateUtils.anyBlank(targetServers, sourceServers)) {
|
||||
// 逐个获取
|
||||
List<Triple<KSConnectorInfo, List<String>, KSConnectorStateInfo>> connectorFullInfoList = new ArrayList<>();
|
||||
for (String connectorName: nameListResult.getData()) {
|
||||
Result<Triple<KSConnectorInfo, List<String>, KSConnectorStateInfo>> ksConnectorResult = this.getConnectorFullInfoFromKafka(connectCluster, connectorName);
|
||||
if (ksConnectorResult.failed()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String[] targetBrokerList = getBrokerList(targetServers);
|
||||
String[] sourceBrokerList = getBrokerList(sourceServers);
|
||||
sourceConnector.setHeartbeatConnectorName(this.findBindConnector(targetBrokerList, sourceBrokerList, heartbeatMap));
|
||||
sourceConnector.setCheckpointConnectorName(this.findBindConnector(targetBrokerList, sourceBrokerList, checkpointMap));
|
||||
connectorFullInfoList.add(ksConnectorResult.getData());
|
||||
}
|
||||
|
||||
// 返回结果
|
||||
return Result.buildSuc(new Tuple<>(
|
||||
new HashSet<>(nameListResult.getData()),
|
||||
ConnectConverter.convertAndSupplyMirrorMakerInfo(connectCluster, connectorFullInfoList)) // 转换并补充mm2相关信息
|
||||
);
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
private int deleteConnectorInDB(Long connectClusterId, String connectorName) {
|
||||
@Override
|
||||
public void writeToDB(Long connectClusterId, Set<String> fullNameSet, List<KSConnector> dataList) {
|
||||
List<ConnectorPO> poList = this.listByConnectClusterIdFromDB(connectClusterId);
|
||||
|
||||
Map<String, ConnectorPO> oldPOMap = new HashMap<>();
|
||||
poList.forEach(elem -> oldPOMap.put(elem.getConnectorName(), elem));
|
||||
|
||||
for (KSConnector connector: dataList) {
|
||||
try {
|
||||
ConnectorPO oldPO = oldPOMap.remove(connector.getConnectorName());
|
||||
if (oldPO == null) {
|
||||
oldPO = ConvertUtil.obj2Obj(connector, ConnectorPO.class);
|
||||
connectorDAO.insert(oldPO);
|
||||
continue;
|
||||
}
|
||||
|
||||
ConnectorPO newPO = ConvertUtil.obj2Obj(connector, ConnectorPO.class);
|
||||
newPO.setId(oldPO.getId());
|
||||
if (!ValidateUtils.isBlank(oldPO.getCheckpointConnectorName())
|
||||
&& ValidateUtils.isBlank(newPO.getCheckpointConnectorName())
|
||||
&& fullNameSet.contains(oldPO.getCheckpointConnectorName())) {
|
||||
// 新的po里面没有checkpoint的信息,但是db中的数据显示有,且集群中有该connector,则保留该checkpoint数据
|
||||
newPO.setCheckpointConnectorName(oldPO.getCheckpointConnectorName());
|
||||
}
|
||||
|
||||
if (!ValidateUtils.isBlank(oldPO.getHeartbeatConnectorName())
|
||||
&& ValidateUtils.isBlank(newPO.getHeartbeatConnectorName())
|
||||
&& fullNameSet.contains(oldPO.getHeartbeatConnectorName())) {
|
||||
// 新的po里面没有checkpoint的信息,但是db中的数据显示有,且集群中有该connector,则保留该checkpoint数据
|
||||
newPO.setHeartbeatConnectorName(oldPO.getHeartbeatConnectorName());
|
||||
}
|
||||
|
||||
connectorDAO.updateById(newPO);
|
||||
} catch (DuplicateKeyException dke) {
|
||||
// ignore
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=writeToDB||connectClusterId={}||connectorName={}||errMsg=exception",
|
||||
connector.getConnectClusterId(), connector.getConnectorName(), e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
oldPOMap.values().forEach(elem -> {
|
||||
if (fullNameSet.contains(elem.getConnectorName())) {
|
||||
// 当前connector还存在
|
||||
return;
|
||||
}
|
||||
|
||||
// 当前connector不存在了,则进行删除
|
||||
connectorDAO.deleteById(elem.getId());
|
||||
});
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteInDBByKafkaClusterId(Long clusterPhyId) {
|
||||
LambdaQueryWrapper<ConnectorPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(ConnectorPO::getConnectClusterId, connectClusterId);
|
||||
lambdaQueryWrapper.eq(ConnectorPO::getConnectorName, connectorName);
|
||||
lambdaQueryWrapper.eq(ConnectorPO::getKafkaClusterPhyId, clusterPhyId);
|
||||
|
||||
return connectorDAO.delete(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private Result<KSConnectorInfo> getConnectorInfoFromCluster(ConnectCluster connectCluster, String connectorName) {
|
||||
try {
|
||||
ConnectorInfo connectorInfo = restTool.getForObject(
|
||||
@@ -594,90 +309,37 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
|
||||
}
|
||||
}
|
||||
|
||||
private void updateStatus(ConnectCluster connectCluster, Long connectClusterId, String connectorName) {
|
||||
try {
|
||||
// 延迟3秒
|
||||
BackoffUtils.backoff(2000);
|
||||
|
||||
Result<KSConnectorStateInfo> stateInfoResult = this.getConnectorStateInfoFromCluster(connectCluster, connectorName);
|
||||
if (stateInfoResult.failed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ConnectorPO po = new ConnectorPO();
|
||||
po.setConnectClusterId(connectClusterId);
|
||||
po.setConnectorName(connectorName);
|
||||
po.setState(stateInfoResult.getData().getConnector().getState());
|
||||
|
||||
LambdaQueryWrapper<ConnectorPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(ConnectorPO::getConnectClusterId, connectClusterId);
|
||||
lambdaQueryWrapper.eq(ConnectorPO::getConnectorName, connectorName);
|
||||
|
||||
connectorDAO.update(po, lambdaQueryWrapper);
|
||||
} catch (Exception e) {
|
||||
private Result<Triple<KSConnectorInfo, List<String>, KSConnectorStateInfo>> getConnectorFullInfoFromKafka(ConnectCluster connectCluster, String connectorName) {
|
||||
Result<KSConnectorInfo> connectorResult = this.getConnectorInfoFromCluster(connectCluster, connectorName);
|
||||
if (connectorResult.failed()) {
|
||||
LOGGER.error(
|
||||
"method=updateStatus||connectClusterId={}||connectorName={}||errMsg=exception",
|
||||
connectClusterId, connectorName, e
|
||||
"method=getConnectorAllInfoFromKafka||connectClusterId={}||connectClusterSuitableUrl={}||result={}||errMsg=get connectors info from cluster failed",
|
||||
connectCluster.getId(), connectCluster.getSuitableRequestUrl(), connectorResult
|
||||
);
|
||||
|
||||
return Result.buildFromIgnoreData(connectorResult);
|
||||
}
|
||||
|
||||
Result<List<String>> topicNameListResult = this.getConnectorTopicsFromCluster(connectCluster, connectorName);
|
||||
if (topicNameListResult.failed()) {
|
||||
LOGGER.error(
|
||||
"method=getConnectorAllInfoFromKafka||connectClusterId={}||connectClusterSuitableUrl={}||result={}||errMsg=get connectors topics from cluster failed",
|
||||
connectCluster.getId(), connectCluster.getSuitableRequestUrl(), topicNameListResult
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> buildMirrorMakerMap(ConnectCluster connectCluster, List<KSConnector> ksConnectorList) {
|
||||
Map<String, String> bindMap = new HashMap<>();
|
||||
|
||||
for (KSConnector ksConnector : ksConnectorList) {
|
||||
Result<KSConnectorInfo> ret = this.getConnectorInfoFromCluster(connectCluster, ksConnector.getConnectorName());
|
||||
|
||||
if (!ret.hasData()) {
|
||||
LOGGER.error(
|
||||
"method=buildMirrorMakerMap||connectClusterId={}||connectorName={}||get connectorInfo fail!",
|
||||
connectCluster.getId(), ksConnector.getConnectorName()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
KSConnectorInfo ksConnectorInfo = ret.getData();
|
||||
String targetServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
|
||||
String sourceServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
|
||||
|
||||
if (ValidateUtils.anyBlank(targetServers, sourceServers)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String[] targetBrokerList = getBrokerList(targetServers);
|
||||
String[] sourceBrokerList = getBrokerList(sourceServers);
|
||||
for (String targetBroker : targetBrokerList) {
|
||||
for (String sourceBroker : sourceBrokerList) {
|
||||
bindMap.put(targetBroker + "@" + sourceBroker, ksConnector.getConnectorName());
|
||||
}
|
||||
}
|
||||
|
||||
Result<KSConnectorStateInfo> stateInfoResult = this.getConnectorStateInfoFromCluster(connectCluster, connectorName);
|
||||
if (stateInfoResult.failed()) {
|
||||
LOGGER.error(
|
||||
"method=getConnectorAllInfoFromKafka||connectClusterId={}||connectClusterSuitableUrl={}||result={}||errMsg=get connectors state from cluster failed",
|
||||
connectCluster.getId(), connectCluster.getSuitableRequestUrl(), stateInfoResult
|
||||
);
|
||||
}
|
||||
return bindMap;
|
||||
}
|
||||
|
||||
private String findBindConnector(String[] targetBrokerList, String[] sourceBrokerList, Map<String, String> connectorBindMap) {
|
||||
for (String targetBroker : targetBrokerList) {
|
||||
for (String sourceBroker : sourceBrokerList) {
|
||||
String connectorName = connectorBindMap.get(targetBroker + "@" + sourceBroker);
|
||||
if (connectorName != null) {
|
||||
return connectorName;
|
||||
}
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
private String[] getBrokerList(String str) {
|
||||
if (ValidateUtils.isBlank(str)) {
|
||||
return new String[0];
|
||||
}
|
||||
if (str.contains(";")) {
|
||||
return str.split(";");
|
||||
}
|
||||
if (str.contains(",")) {
|
||||
return str.split(",");
|
||||
}
|
||||
return new String[]{str};
|
||||
return Result.buildSuc(new Triple<>(
|
||||
connectorResult.getData(),
|
||||
topicNameListResult.getData(),
|
||||
stateInfoResult.getData()
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,352 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.connect.connector.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.component.RestTool;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.connect.ConnectorDAO;
|
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_CONNECT_CONNECTOR;
|
||||
|
||||
@Service
|
||||
public class OpConnectorServiceImpl extends BaseVersionControlService implements OpConnectorService {
|
||||
private static final ILog LOGGER = LogFactory.getLog(OpConnectorServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
private RestTool restTool;
|
||||
|
||||
@Autowired
|
||||
private ConnectorDAO connectorDAO;
|
||||
|
||||
@Autowired
|
||||
private ConnectClusterService connectClusterService;
|
||||
|
||||
@Autowired
|
||||
private OpLogWrapService opLogWrapService;
|
||||
|
||||
private static final String GET_CONNECTOR_STATUS_URI = "/connectors/%s/status";
|
||||
|
||||
private static final String CREATE_CONNECTOR_URI = "/connectors";
|
||||
private static final String RESUME_CONNECTOR_URI = "/connectors/%s/resume";
|
||||
private static final String RESTART_CONNECTOR_URI = "/connectors/%s/restart";
|
||||
private static final String PAUSE_CONNECTOR_URI = "/connectors/%s/pause";
|
||||
private static final String DELETE_CONNECTOR_URI = "/connectors/%s";
|
||||
private static final String UPDATE_CONNECTOR_CONFIG_URI = "/connectors/%s/config";
|
||||
|
||||
@Override
|
||||
protected VersionItemTypeEnum getVersionItemType() {
|
||||
return SERVICE_OP_CONNECT_CONNECTOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<KSConnectorInfo> createConnector(Long connectClusterId, String connectorName, Properties configs, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
// 构造参数
|
||||
Properties props = new Properties();
|
||||
props.put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, connectorName);
|
||||
props.put("config", configs);
|
||||
|
||||
ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent(
|
||||
connectCluster.getSuitableRequestUrl() + CREATE_CONNECTOR_URI,
|
||||
props,
|
||||
ConnectorInfo.class
|
||||
);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.ADD.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
ConvertUtil.obj2Json(configs)
|
||||
));
|
||||
|
||||
KSConnectorInfo connector = new KSConnectorInfo();
|
||||
connector.setConnectClusterId(connectClusterId);
|
||||
connector.setConfig(connectorInfo.config());
|
||||
connector.setName(connectorInfo.name());
|
||||
connector.setTasks(connectorInfo.tasks());
|
||||
connector.setType(connectorInfo.type());
|
||||
|
||||
return Result.buildSuc(connector);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=createConnector||connectClusterId={}||connectorName={}||configs={}||operator={}||errMsg=exception",
|
||||
connectClusterId, connectorName, configs, operator, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> resumeConnector(Long connectClusterId, String connectorName, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
restTool.putJsonForObject(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(RESUME_CONNECTOR_URI, connectorName),
|
||||
new HashMap<>(),
|
||||
String.class
|
||||
);
|
||||
|
||||
this.updateStatus(connectCluster, connectClusterId, connectorName);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.ENABLE.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
""
|
||||
));
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"class=ConnectorServiceImpl||method=resumeConnector||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> restartConnector(Long connectClusterId, String connectorName, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
restTool.postObjectWithJsonContent(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(RESTART_CONNECTOR_URI, connectorName),
|
||||
new HashMap<>(),
|
||||
String.class
|
||||
);
|
||||
|
||||
this.updateStatus(connectCluster, connectClusterId, connectorName);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.RESTART.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
""
|
||||
));
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=restartConnector||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> stopConnector(Long connectClusterId, String connectorName, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
restTool.putJsonForObject(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName),
|
||||
new HashMap<>(),
|
||||
String.class
|
||||
);
|
||||
|
||||
this.updateStatus(connectCluster, connectClusterId, connectorName);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.DISABLE.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
""
|
||||
));
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=stopConnector||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> deleteConnector(Long connectClusterId, String connectorName, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
restTool.deleteWithParamsAndHeader(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(DELETE_CONNECTOR_URI, connectorName),
|
||||
new HashMap<>(),
|
||||
new HashMap<>(),
|
||||
String.class
|
||||
);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.DELETE.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
""
|
||||
));
|
||||
|
||||
this.deleteConnectorInDB(connectClusterId, connectorName);
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=deleteConnector||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator) {
|
||||
try {
|
||||
ConnectCluster connectCluster = connectClusterService.getById(connectClusterId);
|
||||
if (ValidateUtils.isNull(connectCluster)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId));
|
||||
}
|
||||
|
||||
ConnectorInfo connectorInfo = restTool.putJsonForObject(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName),
|
||||
configs,
|
||||
ConnectorInfo.class
|
||||
);
|
||||
|
||||
this.updateStatus(connectCluster, connectClusterId, connectorName);
|
||||
|
||||
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
|
||||
operator,
|
||||
OperationEnum.EDIT.getDesc(),
|
||||
ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(),
|
||||
MsgConstant.getConnectorBizStr(connectClusterId, connectorName),
|
||||
ConvertUtil.obj2Json(configs)
|
||||
));
|
||||
|
||||
return Result.buildSuc();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=updateConnectorConfig||connectClusterId={}||errMsg=exception",
|
||||
connectClusterId, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addNewToDB(KSConnector connector) {
|
||||
try {
|
||||
connectorDAO.insert(ConvertUtil.obj2Obj(connector, ConnectorPO.class));
|
||||
} catch (DuplicateKeyException dke) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
private int deleteConnectorInDB(Long connectClusterId, String connectorName) {
|
||||
LambdaQueryWrapper<ConnectorPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(ConnectorPO::getConnectClusterId, connectClusterId);
|
||||
lambdaQueryWrapper.eq(ConnectorPO::getConnectorName, connectorName);
|
||||
|
||||
return connectorDAO.delete(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
private Result<KSConnectorStateInfo> getConnectorStateInfoFromCluster(ConnectCluster connectCluster, String connectorName) {
|
||||
try {
|
||||
KSConnectorStateInfo connectorStateInfo = restTool.getForObject(
|
||||
connectCluster.getSuitableRequestUrl() + String.format(GET_CONNECTOR_STATUS_URI, connectorName),
|
||||
new HashMap<>(),
|
||||
KSConnectorStateInfo.class
|
||||
);
|
||||
|
||||
return Result.buildSuc(connectorStateInfo);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=getConnectorStateInfoFromCluster||connectClusterId={}||connectorName={}||errMsg=exception",
|
||||
connectCluster.getId(), connectorName, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void updateStatus(ConnectCluster connectCluster, Long connectClusterId, String connectorName) {
|
||||
try {
|
||||
// 延迟3秒
|
||||
BackoffUtils.backoff(2000);
|
||||
|
||||
Result<KSConnectorStateInfo> stateInfoResult = this.getConnectorStateInfoFromCluster(connectCluster, connectorName);
|
||||
if (stateInfoResult.failed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ConnectorPO po = new ConnectorPO();
|
||||
po.setConnectClusterId(connectClusterId);
|
||||
po.setConnectorName(connectorName);
|
||||
po.setState(stateInfoResult.getData().getConnector().getState());
|
||||
|
||||
LambdaQueryWrapper<ConnectorPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(ConnectorPO::getConnectClusterId, connectClusterId);
|
||||
lambdaQueryWrapper.eq(ConnectorPO::getConnectorName, connectorName);
|
||||
|
||||
connectorDAO.update(po, lambdaQueryWrapper);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=updateStatus||connectClusterId={}||connectorName={}||errMsg=exception",
|
||||
connectClusterId, connectorName, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -27,7 +27,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.mm2.MirrorMakerMetricESDAO;
|
||||
@@ -49,7 +49,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
|
||||
* @date 2022/12/15
|
||||
*/
|
||||
@Service
|
||||
public class MirrorMakerMetricServiceImpl extends BaseConnectorMetricService implements MirrorMakerMetricService {
|
||||
public class MirrorMakerMetricServiceImpl extends BaseConnectMetricService implements MirrorMakerMetricService {
|
||||
protected static final ILog LOGGER = LogFactory.getLog(MirrorMakerMetricServiceImpl.class);
|
||||
|
||||
public static final String MIRROR_MAKER_METHOD_DO_NOTHING = "doNothing";
|
||||
@@ -190,7 +190,7 @@ public class MirrorMakerMetricServiceImpl extends BaseConnectorMetricService imp
|
||||
|
||||
multiLinesVO.setMetricLines(metricLines);
|
||||
multiLinesVOS.add(multiLinesVO);
|
||||
}catch (Exception e){
|
||||
} catch (Exception e){
|
||||
LOGGER.error("method=metricMap2VO||connectClusterId={}||msg=exception!", connectClusterId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +78,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
|
||||
}
|
||||
|
||||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers());
|
||||
props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d||timestamp=%d", clusterPhy.getId(), System.currentTimeMillis()));
|
||||
|
||||
adminClient = KSPartialKafkaAdminClient.create(props);
|
||||
KSListGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups(
|
||||
@@ -178,6 +179,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements
|
||||
}
|
||||
|
||||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers());
|
||||
props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d||timestamp=%d", clusterPhy.getId(), System.currentTimeMillis()));
|
||||
|
||||
adminClient = KSPartialKafkaAdminClient.create(props);
|
||||
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.meta;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Kafka元信息服务接口
|
||||
*/
|
||||
public interface MetaDataService<T> {
|
||||
/**
|
||||
* 从Kafka中获取数据
|
||||
* @param connectCluster connect集群
|
||||
* @return 全部资源列表, 成功的资源列表
|
||||
*/
|
||||
default Result<Tuple<Set<String>, List<T>>> getDataFromKafka(ConnectCluster connectCluster) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); }
|
||||
|
||||
/**
|
||||
* 从Kafka中获取数据
|
||||
* @param clusterPhy kafka集群
|
||||
* @return 全部资源集合, 成功的资源列表
|
||||
*/
|
||||
default Result<List<T>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new ArrayList<>()); }
|
||||
|
||||
/**
|
||||
* 元信息同步至DB中
|
||||
* @param clusterId 集群ID
|
||||
* @param fullResSet 全部资源列表
|
||||
* @param dataList 成功的资源列表
|
||||
*/
|
||||
default void writeToDB(Long clusterId, Set<String> fullResSet, List<T> dataList) {}
|
||||
|
||||
/**
|
||||
* 元信息同步至DB中
|
||||
* @param clusterId 集群ID
|
||||
* @param dataList 成功的资源列表
|
||||
*/
|
||||
default void writeToDB(Long clusterId, List<T> dataList) {}
|
||||
|
||||
/**
|
||||
* 依据kafka集群ID删除数据
|
||||
* @param clusterPhyId kafka集群ID
|
||||
*/
|
||||
int deleteInDBByKafkaClusterId(Long clusterPhyId);
|
||||
}
|
||||
@@ -9,7 +9,7 @@ import java.util.stream.Collectors;
|
||||
* @author wyb
|
||||
* @date 2022/11/9
|
||||
*/
|
||||
public abstract class BaseConnectorMetricService extends BaseConnectorVersionControlService{
|
||||
public abstract class BaseConnectMetricService extends BaseConnectVersionControlService {
|
||||
private List<String> metricNames = new ArrayList<>();
|
||||
|
||||
@PostConstruct
|
||||
@@ -14,7 +14,7 @@ import javax.annotation.Nullable;
|
||||
* @author wyb
|
||||
* @date 2022/11/8
|
||||
*/
|
||||
public abstract class BaseConnectorVersionControlService extends BaseVersionControlService {
|
||||
public abstract class BaseConnectVersionControlService extends BaseVersionControlService {
|
||||
|
||||
@Autowired
|
||||
ConnectClusterService connectClusterService;
|
||||
@@ -24,6 +24,8 @@ public class ConnectorMetricVersionItems extends BaseMetricVersionMetric {
|
||||
|
||||
public static final String CONNECTOR_METRIC_HEALTH_STATE = "HealthState";
|
||||
|
||||
public static final String CONNECTOR_METRIC_RUNNING_STATUS = "RunningStatus";
|
||||
|
||||
public static final String CONNECTOR_METRIC_CONNECTOR_TOTAL_TASK_COUNT = "ConnectorTotalTaskCount";
|
||||
|
||||
public static final String CONNECTOR_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed";
|
||||
@@ -128,6 +130,9 @@ public class ConnectorMetricVersionItems extends BaseMetricVersionMetric {
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(CONNECTOR_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH)
|
||||
.extendMethod(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE));
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(CONNECTOR_METRIC_RUNNING_STATUS).unit("0:UNASSIGNED 1:RUNNING 2:PAUSED 3:FAILED 4:DESTROYED -1:UNKNOWN").desc("运行状态(0:UNASSIGNED 1:RUNNING 2:PAUSED 3:FAILED 4:DESTROYED -1:UNKNOWN)").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(CONNECTOR_METHOD_GET_METRIC_RUNNING_STATUS));
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(CONNECTOR_METRIC_HEALTH_CHECK_PASSED).unit("个").desc("健康项检查通过数").category(CATEGORY_HEALTH)
|
||||
.extendMethod(CONNECTOR_METHOD_GET_METRIC_HEALTH_SCORE));
|
||||
|
||||
@@ -1,19 +1,11 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.zookeeper;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface ZookeeperService {
|
||||
/**
|
||||
* 从ZK集群中获取ZK信息
|
||||
*/
|
||||
Result<List<ZookeeperInfo>> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig);
|
||||
|
||||
void batchReplaceDataInDB(Long clusterPhyId, List<ZookeeperInfo> infoList);
|
||||
|
||||
public interface ZookeeperService extends MetaDataService<ZookeeperInfo> {
|
||||
List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId);
|
||||
|
||||
/**
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
@@ -22,10 +23,8 @@ import com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper.ZookeeperDAO
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
public class ZookeeperServiceImpl implements ZookeeperService {
|
||||
@@ -35,14 +34,14 @@ public class ZookeeperServiceImpl implements ZookeeperService {
|
||||
private ZookeeperDAO zookeeperDAO;
|
||||
|
||||
@Override
|
||||
public Result<List<ZookeeperInfo>> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig) {
|
||||
public Result<List<ZookeeperInfo>> getDataFromKafka(ClusterPhy clusterPhy) {
|
||||
List<Tuple<String, Integer>> addressList = null;
|
||||
try {
|
||||
addressList = ZookeeperUtils.connectStringParser(zookeeperAddress);
|
||||
addressList = ZookeeperUtils.connectStringParser(clusterPhy.getZookeeper());
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=listFromZookeeperCluster||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!",
|
||||
clusterPhyId, zookeeperAddress, e
|
||||
"method=getDataFromKafka||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!",
|
||||
clusterPhy.getId(), clusterPhy.getZookeeper(), e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, e.getMessage());
|
||||
@@ -51,24 +50,25 @@ public class ZookeeperServiceImpl implements ZookeeperService {
|
||||
List<ZookeeperInfo> aliveZKList = new ArrayList<>();
|
||||
for (Tuple<String, Integer> hostPort: addressList) {
|
||||
aliveZKList.add(this.getFromZookeeperCluster(
|
||||
clusterPhyId,
|
||||
clusterPhy.getId(),
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
zkConfig
|
||||
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class)
|
||||
));
|
||||
}
|
||||
|
||||
return Result.buildSuc(aliveZKList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batchReplaceDataInDB(Long clusterPhyId, List<ZookeeperInfo> infoList) {
|
||||
public void writeToDB(Long clusterId, List<ZookeeperInfo> dataList) {
|
||||
// DB 中的信息
|
||||
List<ZookeeperInfoPO> dbInfoList = this.listRawFromDBByCluster(clusterPhyId);
|
||||
Map<String, ZookeeperInfoPO> dbMap = new HashMap<>();
|
||||
dbInfoList.stream().forEach(elem -> dbMap.put(elem.getHost() + elem.getPort(), elem));
|
||||
Map<String, ZookeeperInfoPO> dbMap = this.listRawFromDBByCluster(clusterId)
|
||||
.stream()
|
||||
.collect(Collectors.toMap(elem -> elem.getHost() + elem.getPort(), elem -> elem, (oldValue, newValue) -> newValue));
|
||||
|
||||
// 新获取到的信息
|
||||
List<ZookeeperInfoPO> newInfoList = ConvertUtil.list2List(infoList, ZookeeperInfoPO.class);
|
||||
List<ZookeeperInfoPO> newInfoList = ConvertUtil.list2List(dataList, ZookeeperInfoPO.class);
|
||||
for (ZookeeperInfoPO newInfo: newInfoList) {
|
||||
try {
|
||||
ZookeeperInfoPO oldInfo = dbMap.remove(newInfo.getHost() + newInfo.getPort());
|
||||
@@ -87,7 +87,7 @@ public class ZookeeperServiceImpl implements ZookeeperService {
|
||||
zookeeperDAO.updateById(newInfo);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("method=batchReplaceDataInDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterPhyId, newInfo, e);
|
||||
LOGGER.error("method=writeToDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterId, newInfo, e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,11 +96,19 @@ public class ZookeeperServiceImpl implements ZookeeperService {
|
||||
try {
|
||||
zookeeperDAO.deleteById(entry.getValue().getId());
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("method=batchReplaceDataInDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterPhyId, entry.getValue(), e);
|
||||
LOGGER.error("method=writeToDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterId, entry.getValue(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteInDBByKafkaClusterId(Long clusterPhyId) {
|
||||
LambdaQueryWrapper<ZookeeperInfoPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(ZookeeperInfoPO::getClusterPhyId, clusterPhyId);
|
||||
|
||||
return zookeeperDAO.delete(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId) {
|
||||
return ConvertUtil.list2List(this.listRawFromDBByCluster(clusterPhyId), ZookeeperInfo.class);
|
||||
|
||||
Reference in New Issue
Block a user