mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
bugfix: TopicServiceImpl && JmxServiceImpl && ConsumerService && ConsumerServiceImpl
This commit is contained in:
@@ -42,6 +42,13 @@ public interface ConsumerService {
|
||||
*/
|
||||
List<String> getConsumerGroupConsumedTopicList(Long clusterId, String consumerGroup, String location);
|
||||
|
||||
/**
|
||||
* 获取消费者offset
|
||||
* @param clusterDO 集群
|
||||
* @param topicName topic
|
||||
* @param consumerGroup 消费组
|
||||
* @return Map<partitionId, offset>
|
||||
*/
|
||||
Map<Integer, Long> getConsumerOffset(ClusterDO clusterDO, String topicName, ConsumerGroup consumerGroup);
|
||||
|
||||
/**
|
||||
@@ -52,7 +59,20 @@ public interface ConsumerService {
|
||||
ConsumerGroup consumerGroup,
|
||||
List<PartitionOffsetDTO> partitionOffsetDTOList);
|
||||
|
||||
/**
|
||||
* 获取每个集群消费组的个数
|
||||
* @param clusterDOList 物理集群列表
|
||||
* @return Map<clusterId, consumerGroupNums>
|
||||
*/
|
||||
Map<Long, Integer> getConsumerGroupNumMap(List<ClusterDO> clusterDOList);
|
||||
|
||||
/**
|
||||
* 验证消费组是否存在
|
||||
* @param offsetLocation offset存放位置
|
||||
* @param id 集群id
|
||||
* @param topicName topic
|
||||
* @param consumerGroup 消费组
|
||||
* @return true:存在,false:不存在
|
||||
*/
|
||||
boolean checkConsumerGroupExist(OffsetLocationEnum offsetLocation, Long id, String topicName, String consumerGroup);
|
||||
}
|
||||
|
||||
@@ -159,7 +159,7 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
if (topicMetadata == null) {
|
||||
logger.warn("class=ConsumerServiceImpl||method=getConsumeDetail||clusterId={}||topicName={}||msg=topicMetadata is null!",
|
||||
clusterDO.getId(), topicName);
|
||||
return null;
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<ConsumeDetailDTO> consumerGroupDetailDTOList = null;
|
||||
@@ -170,7 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
}
|
||||
if (consumerGroupDetailDTOList == null) {
|
||||
logger.info("class=ConsumerServiceImpl||method=getConsumeDetail||msg=consumerGroupDetailDTOList is null!");
|
||||
return null;
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> topicPartitionLongMap = topicService.getPartitionOffset(clusterDO, topicName, OffsetPosEnum.END);
|
||||
@@ -317,9 +317,6 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
String consumerGroup) {
|
||||
Map<Integer, String> stringOffsetMap =
|
||||
getOffsetByGroupAndTopicFromBroker(clusterDO, consumerGroup, topicName);
|
||||
if (ValidateUtils.isNull(stringOffsetMap)) {
|
||||
return new HashMap<>(0);
|
||||
}
|
||||
|
||||
Map<Integer, Long> offsetMap = new HashMap<>(stringOffsetMap.size());
|
||||
for (Map.Entry<Integer, String> entry: stringOffsetMap.entrySet()) {
|
||||
|
||||
@@ -164,9 +164,11 @@ public class JmxServiceImpl implements JmxService {
|
||||
if (ValidateUtils.isNull(jmxConnectorWrap)|| !jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
KafkaVersion kafkaVersion = physicalClusterMetadataManager.getKafkaVersion(clusterId, brokerId);
|
||||
|
||||
TopicMetrics metrics = new TopicMetrics(clusterId, topicName);
|
||||
for (MbeanV2 mbeanV2: mbeanV2List) {
|
||||
KafkaVersion kafkaVersion = physicalClusterMetadataManager.getKafkaVersion(clusterId, brokerId);
|
||||
try {
|
||||
getAndSupplyAttributes2BaseMetrics(
|
||||
metrics,
|
||||
|
||||
@@ -416,9 +416,6 @@ public class TopicServiceImpl implements TopicService {
|
||||
topicDO,
|
||||
appDO
|
||||
);
|
||||
if (ValidateUtils.isNull(overview)) {
|
||||
continue;
|
||||
}
|
||||
dtoList.add(overview);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user