mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-05 12:58:04 +08:00
Connect相关代码
This commit is contained in:
@@ -8,9 +8,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDT
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberConsumerAssignment;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberDescription;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
@@ -35,14 +39,13 @@ import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO;
|
||||
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
||||
import org.apache.kafka.clients.admin.MemberDescription;
|
||||
import org.apache.kafka.common.ConsumerGroupState;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -51,6 +54,8 @@ import org.springframework.stereotype.Component;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum.CONNECT_CLUSTER_PROTOCOL_TYPE;
|
||||
|
||||
@Component
|
||||
public class GroupManagerImpl implements GroupManager {
|
||||
private static final ILog log = LogFactory.getLog(GroupManagerImpl.class);
|
||||
@@ -70,6 +75,9 @@ public class GroupManagerImpl implements GroupManager {
|
||||
@Autowired
|
||||
private GroupMetricESDAO groupMetricESDAO;
|
||||
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
@Override
|
||||
public PaginationResult<GroupTopicOverviewVO> pagingGroupMembers(Long clusterPhyId,
|
||||
String topicName,
|
||||
@@ -140,6 +148,11 @@ public class GroupManagerImpl implements GroupManager {
|
||||
String groupName,
|
||||
List<String> latestMetricNames,
|
||||
PaginationSortDTO dto) throws NotExistException, AdminOperateException {
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
|
||||
if (clusterPhy == null) {
|
||||
return PaginationResult.buildFailure(MsgConstant.getClusterPhyNotExist(clusterPhyId), dto);
|
||||
}
|
||||
|
||||
// 获取消费组消费的TopicPartition列表
|
||||
Map<TopicPartition, Long> consumedOffsetMap = groupService.getGroupOffsetFromKafka(clusterPhyId, groupName);
|
||||
List<Integer> partitionList = consumedOffsetMap.keySet()
|
||||
@@ -150,13 +163,18 @@ public class GroupManagerImpl implements GroupManager {
|
||||
Collections.sort(partitionList);
|
||||
|
||||
// 获取消费组当前运行信息
|
||||
ConsumerGroupDescription groupDescription = groupService.getGroupDescriptionFromKafka(clusterPhyId, groupName);
|
||||
KSGroupDescription groupDescription = groupService.getGroupDescriptionFromKafka(clusterPhy, groupName);
|
||||
|
||||
// 转换存储格式
|
||||
Map<TopicPartition, MemberDescription> tpMemberMap = new HashMap<>();
|
||||
for (MemberDescription description: groupDescription.members()) {
|
||||
for (TopicPartition tp: description.assignment().topicPartitions()) {
|
||||
tpMemberMap.put(tp, description);
|
||||
Map<TopicPartition, KSMemberDescription> tpMemberMap = new HashMap<>();
|
||||
|
||||
//如果不是connect集群
|
||||
if (!groupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) {
|
||||
for (KSMemberDescription description : groupDescription.members()) {
|
||||
KSMemberConsumerAssignment assignment = (KSMemberConsumerAssignment) description.assignment();
|
||||
for (TopicPartition tp : assignment.topicPartitions()) {
|
||||
tpMemberMap.put(tp, description);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,11 +191,11 @@ public class GroupManagerImpl implements GroupManager {
|
||||
vo.setTopicName(topicName);
|
||||
vo.setPartitionId(groupMetrics.getPartitionId());
|
||||
|
||||
MemberDescription memberDescription = tpMemberMap.get(new TopicPartition(topicName, groupMetrics.getPartitionId()));
|
||||
if (memberDescription != null) {
|
||||
vo.setMemberId(memberDescription.consumerId());
|
||||
vo.setHost(memberDescription.host());
|
||||
vo.setClientId(memberDescription.clientId());
|
||||
KSMemberDescription ksMemberDescription = tpMemberMap.get(new TopicPartition(topicName, groupMetrics.getPartitionId()));
|
||||
if (ksMemberDescription != null) {
|
||||
vo.setMemberId(ksMemberDescription.consumerId());
|
||||
vo.setHost(ksMemberDescription.host());
|
||||
vo.setClientId(ksMemberDescription.clientId());
|
||||
}
|
||||
|
||||
vo.setLatestMetrics(groupMetrics);
|
||||
@@ -203,7 +221,12 @@ public class GroupManagerImpl implements GroupManager {
|
||||
return rv;
|
||||
}
|
||||
|
||||
ConsumerGroupDescription description = groupService.getGroupDescriptionFromKafka(dto.getClusterId(), dto.getGroupName());
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(dto.getClusterId());
|
||||
if (clusterPhy == null) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(dto.getClusterId()));
|
||||
}
|
||||
|
||||
KSGroupDescription description = groupService.getGroupDescriptionFromKafka(clusterPhy, dto.getGroupName());
|
||||
if (ConsumerGroupState.DEAD.equals(description.state()) && !dto.isCreateIfNotExist()) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, "group不存在, 重置失败");
|
||||
}
|
||||
@@ -345,32 +368,4 @@ public class GroupManagerImpl implements GroupManager {
|
||||
dto
|
||||
);
|
||||
}
|
||||
|
||||
private List<GroupTopicOverviewVO> convert2GroupTopicOverviewVOList(String groupName, String state, List<GroupTopicMember> groupTopicList, List<GroupMetrics> metricsList) {
|
||||
if (metricsList == null) {
|
||||
metricsList = new ArrayList<>();
|
||||
}
|
||||
|
||||
// <TopicName, GroupMetrics>
|
||||
Map<String, GroupMetrics> metricsMap = new HashMap<>();
|
||||
for (GroupMetrics metrics : metricsList) {
|
||||
if (!groupName.equals(metrics.getGroup())) continue;
|
||||
metricsMap.put(metrics.getTopic(), metrics);
|
||||
}
|
||||
|
||||
List<GroupTopicOverviewVO> voList = new ArrayList<>();
|
||||
for (GroupTopicMember po : groupTopicList) {
|
||||
GroupTopicOverviewVO vo = ConvertUtil.obj2Obj(po, GroupTopicOverviewVO.class);
|
||||
vo.setGroupName(groupName);
|
||||
vo.setState(state);
|
||||
GroupMetrics metrics = metricsMap.get(po.getTopicName());
|
||||
if (metrics != null) {
|
||||
vo.setMaxLag(ConvertUtil.Float2Long(metrics.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG)));
|
||||
}
|
||||
|
||||
voList.add(vo);
|
||||
}
|
||||
return voList;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@@ -27,7 +27,7 @@ import java.util.stream.Collectors;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
|
||||
|
||||
@Component
|
||||
public class TopicConfigManagerImpl extends BaseVersionControlService implements TopicConfigManager {
|
||||
public class TopicConfigManagerImpl extends BaseKafkaVersionControlService implements TopicConfigManager {
|
||||
private static final ILog log = LogFactory.getLog(TopicConfigManagerImpl.class);
|
||||
|
||||
private static final String GET_DEFAULT_TOPIC_CONFIG = "getDefaultTopicConfig";
|
||||
|
||||
@@ -20,7 +20,7 @@ public interface VersionControlManager {
|
||||
* 获取当前ks所有支持的kafka版本
|
||||
* @return
|
||||
*/
|
||||
Result<Map<String, Long>> listAllVersions();
|
||||
Result<Map<String, Long>> listAllKafkaVersions();
|
||||
|
||||
/**
|
||||
* 获取全部集群 clusterId 中类型为 type 的指标,不论支持不支持
|
||||
@@ -28,7 +28,7 @@ public interface VersionControlManager {
|
||||
* @param type
|
||||
* @return
|
||||
*/
|
||||
Result<List<VersionItemVO>> listClusterVersionControlItem(Long clusterId, Integer type);
|
||||
Result<List<VersionItemVO>> listKafkaClusterVersionControlItem(Long clusterId, Integer type);
|
||||
|
||||
/**
|
||||
* 获取当前用户设置的用于展示的指标配置
|
||||
|
||||
@@ -17,6 +17,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.version.VersionItemVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.VersionUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -92,6 +93,9 @@ public class VersionControlManagerImpl implements VersionControlManager {
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_BYTES_OUT, true));
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
@Autowired
|
||||
private VersionControlService versionControlService;
|
||||
|
||||
@@ -107,7 +111,13 @@ public class VersionControlManagerImpl implements VersionControlManager {
|
||||
allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_BROKER.getCode()), VersionItemVO.class));
|
||||
allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_PARTITION.getCode()), VersionItemVO.class));
|
||||
allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_REPLICATION.getCode()), VersionItemVO.class));
|
||||
|
||||
allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_ZOOKEEPER.getCode()), VersionItemVO.class));
|
||||
|
||||
allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_CONNECT_CLUSTER.getCode()), VersionItemVO.class));
|
||||
allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_CONNECT_CONNECTOR.getCode()), VersionItemVO.class));
|
||||
allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_CONNECT_MIRROR_MAKER.getCode()), VersionItemVO.class));
|
||||
|
||||
allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(WEB_OP.getCode()), VersionItemVO.class));
|
||||
|
||||
Map<String, VersionItemVO> map = allVersionItemVO.stream().collect(
|
||||
@@ -121,18 +131,20 @@ public class VersionControlManagerImpl implements VersionControlManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Map<String, Long>> listAllVersions() {
|
||||
public Result<Map<String, Long>> listAllKafkaVersions() {
|
||||
return Result.buildSuc(VersionEnum.allVersionsWithOutMax());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<List<VersionItemVO>> listClusterVersionControlItem(Long clusterId, Integer type) {
|
||||
public Result<List<VersionItemVO>> listKafkaClusterVersionControlItem(Long clusterId, Integer type) {
|
||||
List<VersionControlItem> allItem = versionControlService.listVersionControlItem(type);
|
||||
List<VersionItemVO> versionItemVOS = new ArrayList<>();
|
||||
|
||||
String versionStr = clusterPhyService.getVersionFromCacheFirst(clusterId);
|
||||
|
||||
for (VersionControlItem item : allItem){
|
||||
VersionItemVO itemVO = ConvertUtil.obj2Obj(item, VersionItemVO.class);
|
||||
boolean support = versionControlService.isClusterSupport(clusterId, item);
|
||||
boolean support = versionControlService.isClusterSupport(versionStr, item);
|
||||
|
||||
itemVO.setSupport(support);
|
||||
itemVO.setDesc(itemSupportDesc(item, support));
|
||||
@@ -145,7 +157,7 @@ public class VersionControlManagerImpl implements VersionControlManager {
|
||||
|
||||
@Override
|
||||
public Result<List<UserMetricConfigVO>> listUserMetricItem(Long clusterId, Integer type, String operator) {
|
||||
Result<List<VersionItemVO>> ret = listClusterVersionControlItem(clusterId, type);
|
||||
Result<List<VersionItemVO>> ret = listKafkaClusterVersionControlItem(clusterId, type);
|
||||
if(null == ret || ret.failed()){
|
||||
return Result.buildFail();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user