mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-08 07:32:11 +08:00
bugfix: TopicService && TopicServiceImpl && ZookeeperServiceImpl
This commit is contained in:
@@ -104,6 +104,13 @@ public interface TopicService {
|
|||||||
*/
|
*/
|
||||||
List<TopicBrokerDTO> getTopicBrokerList(Long clusterId, String topicName);
|
List<TopicBrokerDTO> getTopicBrokerList(Long clusterId, String topicName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断topic是否有数据写入,即分区topic的offset变化
|
||||||
|
* @param physicalClusterId 物理集群Id
|
||||||
|
* @param topicName topic名称
|
||||||
|
* @param latestTime 离当前多久开始计算
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime);
|
Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -247,11 +247,11 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
@Override
|
@Override
|
||||||
public List<TopicPartitionDTO> getTopicPartitionDTO(ClusterDO clusterDO, String topicName, Boolean needDetail) {
|
public List<TopicPartitionDTO> getTopicPartitionDTO(ClusterDO clusterDO, String topicName, Boolean needDetail) {
|
||||||
if (ValidateUtils.isNull(clusterDO) || ValidateUtils.isNull(topicName)) {
|
if (ValidateUtils.isNull(clusterDO) || ValidateUtils.isNull(topicName)) {
|
||||||
return null;
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName);
|
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName);
|
||||||
if (ValidateUtils.isNull(topicMetadata)) {
|
if (ValidateUtils.isNull(topicMetadata)) {
|
||||||
return null;
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
List<PartitionState> partitionStateList = KafkaZookeeperUtils.getTopicPartitionState(
|
List<PartitionState> partitionStateList = KafkaZookeeperUtils.getTopicPartitionState(
|
||||||
@@ -528,7 +528,7 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
public List<PartitionOffsetDTO> getPartitionOffsetList(ClusterDO clusterDO, String topicName, Long timestamp) {
|
public List<PartitionOffsetDTO> getPartitionOffsetList(ClusterDO clusterDO, String topicName, Long timestamp) {
|
||||||
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName);
|
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName);
|
||||||
if (topicMetadata == null) {
|
if (topicMetadata == null) {
|
||||||
return null;
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
||||||
for (Integer partitionId : topicMetadata.getPartitionMap().getPartitions().keySet()) {
|
for (Integer partitionId : topicMetadata.getPartitionMap().getPartitions().keySet()) {
|
||||||
@@ -572,7 +572,7 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
kafkaConsumer.close();
|
kafkaConsumer.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> fetchTopicData(KafkaConsumer kafkaConsumer, ClusterDO clusterDO, String topicName, TopicDataSampleDTO reqObj) {
|
private List<String> fetchTopicData(KafkaConsumer kafkaConsumer, ClusterDO clusterDO, String topicName, TopicDataSampleDTO reqObj) {
|
||||||
@@ -585,7 +585,7 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
tpList.add(new TopicPartition(topicName, partitionId));
|
tpList.add(new TopicPartition(topicName, partitionId));
|
||||||
}
|
}
|
||||||
if (ValidateUtils.isEmptyList(tpList)) {
|
if (ValidateUtils.isEmptyList(tpList)) {
|
||||||
return null;
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
kafkaConsumer.assign(tpList);
|
kafkaConsumer.assign(tpList);
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ public class ZookeeperServiceImpl implements ZookeeperService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result openTopicJmx(Long clusterId, String topicName, TopicJmxSwitch jmxSwitch) {
|
public Result openTopicJmx(Long clusterId, String topicName, TopicJmxSwitch jmxSwitch) {
|
||||||
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isNull(topicName) || ValidateUtils.isNull(jmxSwitch)) {
|
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isNull(topicName)) {
|
||||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user