mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-06 21:52:06 +08:00
合并Master分支
This commit is contained in:
@@ -5,13 +5,13 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<artifactId>km-biz</artifactId>
|
||||
<version>${km.revision}</version>
|
||||
<version>${revision}</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<artifactId>km</artifactId>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<version>${km.revision}</version>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -202,7 +202,7 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
|
||||
//补充非zk模式的JMXPort信息
|
||||
if (!clusterPhy.getRunState().equals(ClusterRunStateEnum.RUN_ZK.getRunState())) {
|
||||
JmxConfig jmxConfig = ConvertUtil.str2ObjByJson(clusterPhy.getJmxProperties(), JmxConfig.class);
|
||||
voList.forEach(elem -> elem.setJmxPort(jmxConfig.getJmxPort() == null ? -1 : jmxConfig.getJmxPort()));
|
||||
voList.forEach(elem -> elem.setJmxPort(jmxConfig.getFinallyJmxPort(String.valueOf(elem.getBrokerId()))));
|
||||
}
|
||||
|
||||
return voList;
|
||||
|
||||
@@ -62,7 +62,8 @@ public class ClusterZookeepersManagerImpl implements ClusterZookeepersManager {
|
||||
vo.setTotalObserverCount(0);
|
||||
vo.setAliveServerCount(0);
|
||||
for (ZookeeperInfo info: infoList) {
|
||||
if (info.getRole().equals(ZKRoleEnum.LEADER.getRole())) {
|
||||
if (info.getRole().equals(ZKRoleEnum.LEADER.getRole()) || info.getRole().equals(ZKRoleEnum.STANDALONE.getRole())) {
|
||||
// leader 或者 standalone
|
||||
vo.setLeaderNode(info.getHost());
|
||||
}
|
||||
|
||||
|
||||
@@ -49,9 +49,9 @@ public class ConnectorManagerImpl implements ConnectorManager {
|
||||
|
||||
@Override
|
||||
public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {
|
||||
dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
|
||||
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
|
||||
|
||||
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
|
||||
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
|
||||
if (createResult.failed()) {
|
||||
return Result.buildFromIgnoreData(createResult);
|
||||
}
|
||||
@@ -67,9 +67,9 @@ public class ConnectorManagerImpl implements ConnectorManager {
|
||||
|
||||
@Override
|
||||
public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) {
|
||||
dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
|
||||
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
|
||||
|
||||
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
|
||||
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
|
||||
if (createResult.failed()) {
|
||||
return Result.buildFromIgnoreData(createResult);
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -131,17 +132,17 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
||||
} else if (checkpointResult.failed() && checkpointResult.failed()) {
|
||||
return Result.buildFromRSAndMsg(
|
||||
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
|
||||
String.format("创建 checkpoint & heartbeat 失败.\n失败信息分别为:%s\n\n%s", checkpointResult.getMessage(), heartbeatResult.getMessage())
|
||||
String.format("创建 checkpoint & heartbeat 失败.%n失败信息分别为:%s%n%n%s", checkpointResult.getMessage(), heartbeatResult.getMessage())
|
||||
);
|
||||
} else if (checkpointResult.failed()) {
|
||||
return Result.buildFromRSAndMsg(
|
||||
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
|
||||
String.format("创建 checkpoint 失败.\n失败信息分别为:%s", checkpointResult.getMessage())
|
||||
String.format("创建 checkpoint 失败.%n失败信息分别为:%s", checkpointResult.getMessage())
|
||||
);
|
||||
} else{
|
||||
return Result.buildFromRSAndMsg(
|
||||
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
|
||||
String.format("创建 heartbeat 失败.\n失败信息分别为:%s", heartbeatResult.getMessage())
|
||||
String.format("创建 heartbeat 失败.%n失败信息分别为:%s", heartbeatResult.getMessage())
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -193,7 +194,7 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
||||
return rv;
|
||||
}
|
||||
|
||||
return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
|
||||
return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -425,7 +426,7 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
||||
public Result<List<ConnectConfigInfosVO>> validateConnectors(MirrorMakerCreateDTO dto) {
|
||||
List<ConnectConfigInfosVO> voList = new ArrayList<>();
|
||||
|
||||
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs());
|
||||
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getSuitableConfig());
|
||||
if (infoResult.failed()) {
|
||||
return Result.buildFromIgnoreData(infoResult);
|
||||
}
|
||||
@@ -479,11 +480,11 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(connectCluster.getKafkaClusterPhyId()));
|
||||
}
|
||||
|
||||
if (!dto.getConfigs().containsKey(CONNECTOR_CLASS_FILED_NAME)) {
|
||||
if (!dto.getSuitableConfig().containsKey(CONNECTOR_CLASS_FILED_NAME)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector缺少connector.class");
|
||||
}
|
||||
|
||||
if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getConfigs().getProperty(CONNECTOR_CLASS_FILED_NAME))) {
|
||||
if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getSuitableConfig().getProperty(CONNECTOR_CLASS_FILED_NAME))) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector的connector.class类型错误");
|
||||
}
|
||||
|
||||
@@ -588,16 +589,14 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
||||
}
|
||||
}
|
||||
|
||||
voList.forEach(elem -> {
|
||||
elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName()));
|
||||
});
|
||||
voList.forEach(elem -> elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName())));
|
||||
|
||||
return voList;
|
||||
}
|
||||
|
||||
private List<ClusterMirrorMakerOverviewVO> completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {
|
||||
|
||||
Map<String, KSConnectorInfo> connectorInfoMap = new HashMap<>();
|
||||
Map<String, KSConnectorInfo> connectorInfoMap = new ConcurrentHashMap<>();
|
||||
|
||||
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
|
||||
ApiCallThreadPoolService.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()),
|
||||
@@ -607,12 +606,10 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
||||
if (connectorInfoRet.hasData()) {
|
||||
connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData());
|
||||
}
|
||||
|
||||
return connectorInfoRet.getData();
|
||||
});
|
||||
}
|
||||
|
||||
ApiCallThreadPoolService.waitResult(1000);
|
||||
ApiCallThreadPoolService.waitResult();
|
||||
|
||||
List<ClusterMirrorMakerOverviewVO> newMirrorMakerVOList = new ArrayList<>();
|
||||
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.group;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
|
||||
@@ -39,5 +40,9 @@ public interface GroupManager {
|
||||
|
||||
Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator) throws Exception;
|
||||
|
||||
Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception;
|
||||
|
||||
@Deprecated
|
||||
List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList);
|
||||
List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList, Integer timeoutUnitMs);
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
|
||||
@@ -17,6 +18,9 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberConsume
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberDescription;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||
@@ -32,6 +36,7 @@ import com.xiaojukeji.know.streaming.km.common.converter.GroupConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||
@@ -40,11 +45,14 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.config.KSConfigUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.OpGroupService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems;
|
||||
import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO;
|
||||
import org.apache.kafka.common.ConsumerGroupState;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
@@ -52,13 +60,14 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum.CONNECT_CLUSTER_PROTOCOL_TYPE;
|
||||
|
||||
@Component
|
||||
public class GroupManagerImpl implements GroupManager {
|
||||
private static final ILog log = LogFactory.getLog(GroupManagerImpl.class);
|
||||
private static final ILog LOGGER = LogFactory.getLog(GroupManagerImpl.class);
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
@@ -66,6 +75,9 @@ public class GroupManagerImpl implements GroupManager {
|
||||
@Autowired
|
||||
private GroupService groupService;
|
||||
|
||||
@Autowired
|
||||
private OpGroupService opGroupService;
|
||||
|
||||
@Autowired
|
||||
private PartitionService partitionService;
|
||||
|
||||
@@ -78,6 +90,9 @@ public class GroupManagerImpl implements GroupManager {
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
@Autowired
|
||||
private KSConfigUtils ksConfigUtils;
|
||||
|
||||
@Override
|
||||
public PaginationResult<GroupTopicOverviewVO> pagingGroupMembers(Long clusterPhyId,
|
||||
String topicName,
|
||||
@@ -85,19 +100,27 @@ public class GroupManagerImpl implements GroupManager {
|
||||
String searchTopicKeyword,
|
||||
String searchGroupKeyword,
|
||||
PaginationBaseDTO dto) {
|
||||
long startTimeUnitMs = System.currentTimeMillis();
|
||||
|
||||
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, groupName, searchTopicKeyword, searchGroupKeyword, dto);
|
||||
|
||||
if (!paginationResult.hasData()) {
|
||||
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
|
||||
}
|
||||
|
||||
List<GroupTopicOverviewVO> groupTopicVOList = this.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
|
||||
List<GroupTopicOverviewVO> groupTopicVOList = this.getGroupTopicOverviewVOList(
|
||||
clusterPhyId,
|
||||
paginationResult.getData().getBizData(),
|
||||
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间
|
||||
);
|
||||
|
||||
return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto) {
|
||||
long startTimeUnitMs = System.currentTimeMillis();
|
||||
|
||||
Group group = groupService.getGroupFromDB(clusterPhyId, groupName);
|
||||
|
||||
//没有topicMember则直接返回
|
||||
@@ -113,7 +136,14 @@ public class GroupManagerImpl implements GroupManager {
|
||||
|
||||
List<GroupMemberPO> groupMemberPOList = paginationResult.getData().getBizData().stream().map(elem -> new GroupMemberPO(clusterPhyId, elem.getTopicName(), groupName, group.getState().getState(), elem.getMemberCount())).collect(Collectors.toList());
|
||||
|
||||
return PaginationResult.buildSuc(this.getGroupTopicOverviewVOList(clusterPhyId, groupMemberPOList), paginationResult);
|
||||
return PaginationResult.buildSuc(
|
||||
this.getGroupTopicOverviewVOList(
|
||||
clusterPhyId,
|
||||
groupMemberPOList,
|
||||
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间
|
||||
),
|
||||
paginationResult
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -121,7 +151,7 @@ public class GroupManagerImpl implements GroupManager {
|
||||
List<Group> groupList = groupService.listClusterGroups(clusterPhyId);
|
||||
|
||||
// 类型转化
|
||||
List<GroupOverviewVO> voList = groupList.stream().map(elem -> GroupConverter.convert2GroupOverviewVO(elem)).collect(Collectors.toList());
|
||||
List<GroupOverviewVO> voList = groupList.stream().map(GroupConverter::convert2GroupOverviewVO).collect(Collectors.toList());
|
||||
|
||||
// 搜索groupName
|
||||
voList = PaginationUtil.pageByFuzzyFilter(voList, dto.getSearchGroupName(), Arrays.asList("name"));
|
||||
@@ -168,9 +198,10 @@ public class GroupManagerImpl implements GroupManager {
|
||||
// 转换存储格式
|
||||
Map<TopicPartition, KSMemberDescription> tpMemberMap = new HashMap<>();
|
||||
|
||||
//如果不是connect集群
|
||||
// 如果不是connect集群
|
||||
if (!groupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) {
|
||||
for (KSMemberDescription description : groupDescription.members()) {
|
||||
// 如果是 Consumer 的 Description ,则 Assignment 的类型为 KSMemberConsumerAssignment 的
|
||||
KSMemberConsumerAssignment assignment = (KSMemberConsumerAssignment) description.assignment();
|
||||
for (TopicPartition tp : assignment.topicPartitions()) {
|
||||
tpMemberMap.put(tp, description);
|
||||
@@ -245,6 +276,52 @@ public class GroupManagerImpl implements GroupManager {
|
||||
return groupService.resetGroupOffsets(dto.getClusterId(), dto.getGroupName(), offsetMapResult.getData(), operator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception {
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(dto.getClusterPhyId());
|
||||
if (clusterPhy == null) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(dto.getClusterPhyId()));
|
||||
}
|
||||
|
||||
|
||||
// 按照group纬度进行删除
|
||||
if (ValidateUtils.isBlank(dto.getGroupName())) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "groupName不允许为空");
|
||||
}
|
||||
if (DeleteGroupTypeEnum.GROUP.getCode().equals(dto.getDeleteType())) {
|
||||
return opGroupService.deleteGroupOffset(
|
||||
new DeleteGroupParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP),
|
||||
operator
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
// 按照topic纬度进行删除
|
||||
if (ValidateUtils.isBlank(dto.getTopicName())) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "topicName不允许为空");
|
||||
}
|
||||
if (DeleteGroupTypeEnum.GROUP_TOPIC.getCode().equals(dto.getDeleteType())) {
|
||||
return opGroupService.deleteGroupTopicOffset(
|
||||
new DeleteGroupTopicParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName()),
|
||||
operator
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
// 按照partition纬度进行删除
|
||||
if (ValidateUtils.isNullOrLessThanZero(dto.getPartitionId())) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "partitionId不允许为空或小于0");
|
||||
}
|
||||
if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.getCode().equals(dto.getDeleteType())) {
|
||||
return opGroupService.deleteGroupTopicPartitionOffset(
|
||||
new DeleteGroupTopicPartitionParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName(), dto.getPartitionId()),
|
||||
operator
|
||||
);
|
||||
}
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "deleteType类型错误");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList) {
|
||||
// 获取指标
|
||||
@@ -256,11 +333,54 @@ public class GroupManagerImpl implements GroupManager {
|
||||
);
|
||||
if (metricsListResult.failed()) {
|
||||
// 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回
|
||||
log.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
|
||||
LOGGER.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
|
||||
}
|
||||
return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> poList, Integer timeoutUnitMs) {
|
||||
Set<String> requestedGroupSet = new HashSet<>();
|
||||
|
||||
// 获取指标
|
||||
Map<String, Map<String, Float>> groupTopicLagMap = new ConcurrentHashMap<>();
|
||||
poList.forEach(elem -> {
|
||||
if (requestedGroupSet.contains(elem.getGroupName())) {
|
||||
// 该Group已经处理过
|
||||
return;
|
||||
}
|
||||
|
||||
requestedGroupSet.add(elem.getGroupName());
|
||||
ApiCallThreadPoolService.runnableTask(
|
||||
String.format("clusterPhyId=%d||groupName=%s||msg=getGroupTopicLag", clusterPhyId, elem.getGroupName()),
|
||||
timeoutUnitMs,
|
||||
() -> {
|
||||
Result<List<GroupMetrics>> listResult = groupMetricService.collectGroupMetricsFromKafka(clusterPhyId, elem.getGroupName(), GroupMetricVersionItems.GROUP_METRIC_LAG);
|
||||
if (listResult == null || !listResult.hasData()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Float> lagMetricMap = new HashMap<>();
|
||||
listResult.getData().forEach(item -> {
|
||||
Float newLag = item.getMetric(GroupMetricVersionItems.GROUP_METRIC_LAG);
|
||||
if (newLag == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Float oldLag = lagMetricMap.getOrDefault(item.getTopic(), newLag);
|
||||
lagMetricMap.put(item.getTopic(), Math.max(oldLag, newLag));
|
||||
});
|
||||
|
||||
groupTopicLagMap.put(elem.getGroupName(), lagMetricMap);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
ApiCallThreadPoolService.waitResult();
|
||||
|
||||
return this.convert2GroupTopicOverviewVOList(poList, groupTopicLagMap);
|
||||
}
|
||||
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
@@ -314,13 +434,22 @@ public class GroupManagerImpl implements GroupManager {
|
||||
metricsList = new ArrayList<>();
|
||||
}
|
||||
|
||||
// <GroupName, <TopicName, GroupMetrics>>
|
||||
Map<String, Map<String, GroupMetrics>> metricsMap = new HashMap<>();
|
||||
// <GroupName, <TopicName, lag>>
|
||||
Map<String, Map<String, Float>> metricsMap = new HashMap<>();
|
||||
metricsList.stream().forEach(elem -> {
|
||||
Float metricValue = elem.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG);
|
||||
if (metricValue == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
metricsMap.putIfAbsent(elem.getGroup(), new HashMap<>());
|
||||
metricsMap.get(elem.getGroup()).put(elem.getTopic(), elem);
|
||||
metricsMap.get(elem.getGroup()).put(elem.getTopic(), metricValue);
|
||||
});
|
||||
|
||||
return this.convert2GroupTopicOverviewVOList(poList, metricsMap);
|
||||
}
|
||||
|
||||
private List<GroupTopicOverviewVO> convert2GroupTopicOverviewVOList(List<GroupMemberPO> poList, Map<String, Map<String, Float>> metricsMap) {
|
||||
List<GroupTopicOverviewVO> voList = new ArrayList<>();
|
||||
for (GroupMemberPO po: poList) {
|
||||
GroupTopicOverviewVO vo = ConvertUtil.obj2Obj(po, GroupTopicOverviewVO.class);
|
||||
@@ -328,9 +457,9 @@ public class GroupManagerImpl implements GroupManager {
|
||||
continue;
|
||||
}
|
||||
|
||||
GroupMetrics metrics = metricsMap.getOrDefault(po.getGroupName(), new HashMap<>()).get(po.getTopicName());
|
||||
if (metrics != null) {
|
||||
vo.setMaxLag(ConvertUtil.Float2Long(metrics.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG)));
|
||||
Float metricValue = metricsMap.getOrDefault(po.getGroupName(), new HashMap<>()).get(po.getTopicName());
|
||||
if (metricValue != null) {
|
||||
vo.setMaxLag(ConvertUtil.Float2Long(metricValue));
|
||||
}
|
||||
|
||||
voList.add(vo);
|
||||
|
||||
@@ -19,4 +19,9 @@ public interface OpTopicManager {
|
||||
* 扩分区
|
||||
*/
|
||||
Result<Void> expandTopic(TopicExpansionDTO dto, String operator);
|
||||
|
||||
/**
|
||||
* 清空Topic
|
||||
*/
|
||||
Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator);
|
||||
}
|
||||
|
||||
@@ -10,10 +10,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
|
||||
@@ -156,6 +158,16 @@ public class OpTopicManagerImpl implements OpTopicManager {
|
||||
return rv;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
|
||||
// 清空Topic
|
||||
Result<Void> rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator);
|
||||
if (rv.failed()) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
return Result.buildSuc();
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.PaginationConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
||||
@@ -38,6 +39,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.config.KSConfigUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||
@@ -45,8 +47,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService;
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.config.TopicConfig;
|
||||
@@ -60,7 +61,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
@Component
|
||||
public class TopicStateManagerImpl implements TopicStateManager {
|
||||
private static final ILog log = LogFactory.getLog(TopicStateManagerImpl.class);
|
||||
private static final ILog LOGGER = LogFactory.getLog(TopicStateManagerImpl.class);
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
@@ -89,6 +90,9 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
@Autowired
|
||||
private GroupManager groupManager;
|
||||
|
||||
@Autowired
|
||||
private KSConfigUtils ksConfigUtils;
|
||||
|
||||
@Override
|
||||
public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException {
|
||||
Topic topic = topicService.getTopic(clusterPhyId, topicName);
|
||||
@@ -101,7 +105,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
TopicBrokerAllVO allVO = new TopicBrokerAllVO();
|
||||
|
||||
allVO.setTotal(topic.getBrokerIdSet().size());
|
||||
allVO.setLive((int)brokerMap.values().stream().filter(elem -> elem.alive()).count());
|
||||
allVO.setLive((int)brokerMap.values().stream().filter(Broker::alive).count());
|
||||
allVO.setDead(allVO.getTotal() - allVO.getLive());
|
||||
|
||||
allVO.setPartitionCount(topic.getPartitionNum());
|
||||
@@ -153,97 +157,28 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
return Result.buildFromIgnoreData(endOffsetsMapResult);
|
||||
}
|
||||
|
||||
List<TopicRecordVO> voList = new ArrayList<>();
|
||||
// 数据采集
|
||||
List<TopicRecordVO> voList = this.getTopicMessages(clusterPhy, topicName, beginOffsetsMapResult.getData(), endOffsetsMapResult.getData(), startTime, dto);
|
||||
|
||||
KafkaConsumer<String, String> kafkaConsumer = null;
|
||||
try {
|
||||
// 创建kafka-consumer
|
||||
kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()));
|
||||
|
||||
List<TopicPartition> partitionList = new ArrayList<>();
|
||||
long maxMessage = 0;
|
||||
for (Map.Entry<TopicPartition, Long> entry : endOffsetsMapResult.getData().entrySet()) {
|
||||
long begin = beginOffsetsMapResult.getData().get(entry.getKey());
|
||||
long end = entry.getValue();
|
||||
if (begin == end){
|
||||
continue;
|
||||
}
|
||||
maxMessage += end - begin;
|
||||
partitionList.add(entry.getKey());
|
||||
}
|
||||
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
|
||||
kafkaConsumer.assign(partitionList);
|
||||
|
||||
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
|
||||
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
|
||||
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
||||
partitionList.forEach(topicPartition -> {
|
||||
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
|
||||
});
|
||||
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
|
||||
}
|
||||
|
||||
for (TopicPartition partition : partitionList) {
|
||||
if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到最旧
|
||||
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
|
||||
} else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到指定时间
|
||||
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
|
||||
} else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到指定位置
|
||||
|
||||
} else {
|
||||
// 默认,重置到最新
|
||||
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
|
||||
}
|
||||
}
|
||||
|
||||
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
|
||||
while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) {
|
||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
|
||||
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
|
||||
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
voList.add(TopicVOConverter.convert2TopicRecordVO(topicName, consumerRecord));
|
||||
if (voList.size() >= dto.getMaxRecords()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// 超时则返回
|
||||
if (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS > dto.getPullTimeoutUnitMs()
|
||||
|| voList.size() > dto.getMaxRecords()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// 排序
|
||||
if (ObjectUtils.isNotEmpty(voList)) {
|
||||
// 默认按时间倒序排序
|
||||
if (StringUtils.isBlank(dto.getSortType())) {
|
||||
dto.setSortType(SortTypeEnum.DESC.getSortType());
|
||||
}
|
||||
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
|
||||
}
|
||||
|
||||
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
|
||||
} catch (Exception e) {
|
||||
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);
|
||||
|
||||
throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED);
|
||||
} finally {
|
||||
if (kafkaConsumer != null) {
|
||||
try {
|
||||
kafkaConsumer.close(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
// 排序
|
||||
if (ValidateUtils.isBlank(dto.getSortType())) {
|
||||
// 默认按时间倒序排序
|
||||
dto.setSortType(SortTypeEnum.DESC.getSortType());
|
||||
}
|
||||
if (ValidateUtils.isBlank(dto.getSortField())) {
|
||||
// 默认按照timestampUnitMs字段排序
|
||||
dto.setSortField(PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD);
|
||||
}
|
||||
|
||||
if (PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD.equals(dto.getSortField())) {
|
||||
// 如果是时间类型,则第二排序规则是offset
|
||||
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType(), PaginationConstant.TOPIC_RECORDS_OFFSET_SORTED_FIELD, dto.getSortType());
|
||||
} else {
|
||||
// 如果是非时间类型,则第二排序规则是时间
|
||||
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType(), PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD, dto.getSortType());
|
||||
}
|
||||
|
||||
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -298,26 +233,37 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
|
||||
@Override
|
||||
public Result<List<TopicPartitionVO>> getTopicPartitions(Long clusterPhyId, String topicName, List<String> metricsNames) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
List<Partition> partitionList = partitionService.listPartitionByTopic(clusterPhyId, topicName);
|
||||
if (ValidateUtils.isEmptyList(partitionList)) {
|
||||
return Result.buildSuc();
|
||||
}
|
||||
|
||||
Result<List<PartitionMetrics>> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames);
|
||||
if (metricsResult.failed()) {
|
||||
// 仅打印错误日志,但是不直接返回错误
|
||||
log.error(
|
||||
"method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from es failed",
|
||||
clusterPhyId, topicName, metricsResult
|
||||
);
|
||||
}
|
||||
|
||||
// 转map
|
||||
Map<Integer, PartitionMetrics> metricsMap = new HashMap<>();
|
||||
if (metricsResult.hasData()) {
|
||||
for (PartitionMetrics metrics: metricsResult.getData()) {
|
||||
metricsMap.put(metrics.getPartitionId(), metrics);
|
||||
}
|
||||
ApiCallThreadPoolService.runnableTask(
|
||||
String.format("clusterPhyId=%d||topicName=%s||method=getTopicPartitions", clusterPhyId, topicName),
|
||||
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTime),
|
||||
() -> {
|
||||
Result<List<PartitionMetrics>> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames);
|
||||
if (metricsResult.failed()) {
|
||||
// 仅打印错误日志,但是不直接返回错误
|
||||
LOGGER.error(
|
||||
"method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from kafka failed",
|
||||
clusterPhyId, topicName, metricsResult
|
||||
);
|
||||
}
|
||||
|
||||
for (PartitionMetrics metrics: metricsResult.getData()) {
|
||||
metricsMap.put(metrics.getPartitionId(), metrics);
|
||||
}
|
||||
}
|
||||
);
|
||||
boolean finished = ApiCallThreadPoolService.waitResultAndReturnFinished(1);
|
||||
|
||||
if (!finished && metricsMap.isEmpty()) {
|
||||
// 未完成 -> 打印日志
|
||||
LOGGER.error("method=getTopicPartitions||clusterPhyId={}||topicName={}||msg=get metrics from kafka failed", clusterPhyId, topicName);
|
||||
}
|
||||
|
||||
List<TopicPartitionVO> voList = new ArrayList<>();
|
||||
@@ -336,7 +282,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
|
||||
// Broker统计信息
|
||||
vo.setBrokerCount(brokerMap.size());
|
||||
vo.setLiveBrokerCount((int)brokerMap.values().stream().filter(elem -> elem.alive()).count());
|
||||
vo.setLiveBrokerCount((int)brokerMap.values().stream().filter(Broker::alive).count());
|
||||
vo.setDeadBrokerCount(vo.getBrokerCount() - vo.getLiveBrokerCount());
|
||||
|
||||
// Partition统计信息
|
||||
@@ -360,13 +306,19 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
|
||||
@Override
|
||||
public PaginationResult<GroupTopicOverviewVO> pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto) {
|
||||
long startTimeUnitMs = System.currentTimeMillis();
|
||||
|
||||
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, "", "", searchGroupName, dto);
|
||||
|
||||
if (!paginationResult.hasData()) {
|
||||
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
|
||||
}
|
||||
|
||||
List<GroupTopicOverviewVO> groupTopicVOList = groupManager.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
|
||||
List<GroupTopicOverviewVO> groupTopicVOList = groupManager.getGroupTopicOverviewVOList(
|
||||
clusterPhyId,
|
||||
paginationResult.getData().getBizData(),
|
||||
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间
|
||||
);
|
||||
|
||||
return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
|
||||
}
|
||||
@@ -386,11 +338,8 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
// ignore
|
||||
return true;
|
||||
}
|
||||
if (filterValue != null && consumerRecord.value() != null && !consumerRecord.value().contains(filterValue)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
return (filterValue != null && consumerRecord.value() != null && !consumerRecord.value().contains(filterValue));
|
||||
}
|
||||
|
||||
private TopicBrokerSingleVO getTopicBrokerSingle(Long clusterPhyId,
|
||||
@@ -450,4 +399,90 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Math.max(2, Math.min(5, maxPollRecords)));
|
||||
return props;
|
||||
}
|
||||
|
||||
private List<TopicRecordVO> getTopicMessages(ClusterPhy clusterPhy,
|
||||
String topicName,
|
||||
Map<TopicPartition, Long> beginOffsetsMap,
|
||||
Map<TopicPartition, Long> endOffsetsMap,
|
||||
long startTime,
|
||||
TopicRecordDTO dto) throws AdminOperateException {
|
||||
List<TopicRecordVO> voList = new ArrayList<>();
|
||||
|
||||
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()))) {
|
||||
// 移动到指定位置
|
||||
long maxMessage = this.assignAndSeekToSpecifiedOffset(kafkaConsumer, beginOffsetsMap, endOffsetsMap, dto);
|
||||
|
||||
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
|
||||
while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) {
|
||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
|
||||
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
|
||||
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
voList.add(TopicVOConverter.convert2TopicRecordVO(topicName, consumerRecord));
|
||||
if (voList.size() >= dto.getMaxRecords()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// 超时则返回
|
||||
if (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS > dto.getPullTimeoutUnitMs()
|
||||
|| voList.size() > dto.getMaxRecords()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return voList;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e);
|
||||
|
||||
throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
private long assignAndSeekToSpecifiedOffset(KafkaConsumer<String, String> kafkaConsumer,
|
||||
Map<TopicPartition, Long> beginOffsetsMap,
|
||||
Map<TopicPartition, Long> endOffsetsMap,
|
||||
TopicRecordDTO dto) {
|
||||
List<TopicPartition> partitionList = new ArrayList<>();
|
||||
long maxMessage = 0;
|
||||
for (Map.Entry<TopicPartition, Long> entry : endOffsetsMap.entrySet()) {
|
||||
long begin = beginOffsetsMap.get(entry.getKey());
|
||||
long end = entry.getValue();
|
||||
if (begin == end){
|
||||
continue;
|
||||
}
|
||||
maxMessage += end - begin;
|
||||
partitionList.add(entry.getKey());
|
||||
}
|
||||
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
|
||||
kafkaConsumer.assign(partitionList);
|
||||
|
||||
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
|
||||
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
|
||||
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
||||
partitionList.forEach(topicPartition -> timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()));
|
||||
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
|
||||
}
|
||||
|
||||
for (TopicPartition partition : partitionList) {
|
||||
if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到最旧
|
||||
kafkaConsumer.seek(partition, beginOffsetsMap.get(partition));
|
||||
} else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到指定时间
|
||||
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
|
||||
} else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到指定位置
|
||||
|
||||
} else {
|
||||
// 默认,重置到最新
|
||||
kafkaConsumer.seek(partition, Math.max(beginOffsetsMap.get(partition), endOffsetsMap.get(partition) - dto.getMaxRecords()));
|
||||
}
|
||||
}
|
||||
|
||||
return maxMessage;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,8 @@ import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafk
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectClusterMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectorMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ZookeeperMetricVersionItems.*;
|
||||
|
||||
@Service
|
||||
@@ -123,6 +125,42 @@ public class VersionControlManagerImpl implements VersionControlManager {
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_COUNT, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_RATE, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX, true));
|
||||
|
||||
// Connect Cluster
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_CONNECTOR_COUNT, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_TASK_COUNT, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_CONNECTOR_STARTUP_ATTEMPTS_TOTAL, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_CONNECTOR_STARTUP_FAILURE_PERCENTAGE, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_CONNECTOR_STARTUP_FAILURE_TOTAL, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_TASK_STARTUP_ATTEMPTS_TOTAL, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_TASK_STARTUP_FAILURE_PERCENTAGE, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_TASK_STARTUP_FAILURE_TOTAL, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_COLLECT_COST_TIME, true));
|
||||
|
||||
|
||||
// Connect Connector
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_HEALTH_STATE, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_HEALTH_CHECK_PASSED, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_HEALTH_CHECK_TOTAL, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_COLLECT_COST_TIME, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_CONNECTOR_TOTAL_TASK_COUNT, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_CONNECTOR_RUNNING_TASK_COUNT, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_CONNECTOR_FAILED_TASK_COUNT, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_ACTIVE_COUNT, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_POLL_TOTAL, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_WRITE_TOTAL, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_ACTIVE_COUNT, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_READ_TOTAL, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_SEND_TOTAL, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_DEADLETTERQUEUE_PRODUCE_FAILURES, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_DEADLETTERQUEUE_PRODUCE_REQUESTS, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_TOTAL_ERRORS_LOGGED, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_POLL_RATE, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_WRITE_RATE, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_READ_RATE, true));
|
||||
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_SEND_RATE, true));
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Autowired
|
||||
|
||||
Reference in New Issue
Block a user